You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/09/16 21:45:17 UTC

[1/3] aurora git commit: Batching writes - Part 1 (of 3): Introducing BatchWorker and task event batching.

Repository: aurora
Updated Branches:
  refs/heads/master f1e09a9c7 -> 496397aa5


Batching writes - Part 1 (of 3): Introducing BatchWorker and task event batching.

Reviewed at https://reviews.apache.org/r/51759/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ebfeb3e6
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ebfeb3e6
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ebfeb3e6

Branch: refs/heads/master
Commit: ebfeb3e602faa9281ff7ff50f42bd21885518953
Parents: f1e09a9
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:04 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:04 2016 -0700

----------------------------------------------------------------------
 .../apache/aurora/scheduler/BatchWorker.java    | 254 +++++++++++++++++++
 .../aurora/scheduler/SchedulerModule.java       |  25 ++
 .../scheduler/pruning/TaskHistoryPruner.java    |  14 +-
 .../scheduler/scheduling/TaskThrottler.java     |  28 +-
 .../scheduler/state/MaintenanceController.java  |  14 +-
 .../updater/JobUpdateControllerImpl.java        |  10 +-
 .../aurora/scheduler/BatchWorkerTest.java       |  96 +++++++
 .../pruning/TaskHistoryPrunerTest.java          |  10 +-
 .../scheduler/scheduling/TaskThrottlerTest.java |   9 +-
 .../state/MaintenanceControllerImplTest.java    |   5 +
 .../scheduler/testing/BatchWorkerUtil.java      |  59 +++++
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   7 +-
 12 files changed, 505 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
new file mode 100644
index 0000000..e05d4b4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Generic helper that allows bundling multiple work items into a single {@link Storage}
+ * transaction aiming to reduce the write lock contention.
+ *
+ * @param <T> Expected result type.
+ */
+public class BatchWorker<T> extends AbstractExecutionThreadService {
+  /**
+   * Empty result placeholder.
+   */
+  public interface NoResult { }
+
+  /**
+   * Convenience wrapper for a non-repeatable no value work {@link Result}.
+   */
+  public static final NoResult NO_RESULT = new NoResult() { };
+
+  private static final Logger LOG = LoggerFactory.getLogger(BatchWorker.class);
+  private final Storage storage;
+  private final int maxBatchSize;
+  private final SlidingStats batchUnlocked;
+  private final SlidingStats batchLocked;
+  private final BlockingQueue<WorkItem<T>> workQueue = new LinkedBlockingQueue<>();
+  private final ScheduledExecutorService scheduledExecutor;
+  private final AtomicInteger lastBatchSize = new AtomicInteger(0);
+  private final AtomicLong itemsProcessed;
+  private final AtomicLong batchesProcessed;
+
+  /**
+   * Wraps result returned by the {@link RepeatableWork} item.
+   *
+   * @param <T> Expected result type.
+   */
+  public static class Result<T> {
+    private final boolean isCompleted;
+    private final T value;
+
+    /**
+     * Initializes a {@link Result} instance with {@code isCompleted} and {@code value}.
+     * <p>
+     * The {@code isCompleted} may be set to {@code False} for a {@link RepeatableWork} that has
+     * not finished yet. Otherwise, it must be set to {@code True}.
+     *
+     * @param isCompleted Flag indicating if the {@link RepeatableWork} has completed.
+     * @param value result value.
+     */
+    public Result(boolean isCompleted, T value) {
+      this.isCompleted = isCompleted;
+      this.value = value;
+    }
+  }
+
+  /**
+   * Encapsulates a potentially repeatable operation.
+   */
+  public interface RepeatableWork<T> {
+    /**
+     * Abstracts a unit of repeatable (i.e.: "repeat until completed") work.
+     * <p>
+     * The work unit may be repeated as instructed by the {@link Result}.
+     *
+     * @param storeProvider {@link MutableStoreProvider} instance.
+     * @return {@link Result}
+     */
+    Result<T> apply(MutableStoreProvider storeProvider);
+  }
+
+  /**
+   * Encapsulates a non-repeatable operation.
+   */
+  public interface Work<T> extends RepeatableWork<T> {
+    @Override
+    default Result<T> apply(MutableStoreProvider storeProvider) {
+      T value = execute(storeProvider);
+      return new Result<>(true, value);
+    }
+
+    /**
+     * Abstracts a unit of non-repeatable (i.e.: "run exactly once") work.
+     *
+     * @param storeProvider {@link MutableStoreProvider} instance.
+     * @return result value.
+     */
+    T execute(MutableStoreProvider storeProvider);
+  }
+
+  @Inject
+  protected BatchWorker(Storage storage, StatsProvider statsProvider, int maxBatchSize) {
+    this.storage = requireNonNull(storage);
+    this.maxBatchSize = maxBatchSize;
+
+    scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor(serviceName() + "-%d", LOG);
+    statsProvider.makeGauge(serviceName() + "_queue_size", () -> workQueue.size());
+    statsProvider.makeGauge(
+        serviceName() + "_last_processed_batch_size",
+        () -> lastBatchSize.intValue());
+    batchUnlocked = new SlidingStats(serviceName() + "_batch_unlocked", "nanos");
+    batchLocked = new SlidingStats(serviceName() + "_batch_locked", "nanos");
+    itemsProcessed = statsProvider.makeCounter(serviceName() + "_items_processed");
+    batchesProcessed = statsProvider.makeCounter(serviceName() + "_batches_processed");
+  }
+
+  /**
+   * Executes a non-repeatable {@link Work} and returns {@link CompletableFuture} to wait on.
+   *
+   * @param work A non-repeatable {@link Work} to execute.
+   * @return {@link CompletableFuture} to wait on.
+   */
+  public CompletableFuture<T> execute(Work<T> work) {
+    CompletableFuture<T> result = new CompletableFuture<>();
+    workQueue.add(new WorkItem<>(
+        work,
+        result,
+        Optional.empty(),
+        Optional.empty()));
+
+    return result;
+  }
+
+  /**
+   * Executes a {@link RepeatableWork} until it completes and returns {@link CompletableFuture}
+   * to wait on.
+   *
+   * @param backoffStrategy A {@link BackoffStrategy} instance to backoff subsequent runs.
+   * @param work A {@link RepeatableWork} to execute.
+   */
+  public CompletableFuture<T> executeWithReplay(
+      BackoffStrategy backoffStrategy,
+      RepeatableWork<T> work) {
+
+    CompletableFuture<T> result = new CompletableFuture<>();
+    workQueue.add(new WorkItem<>(
+        work,
+        result,
+        Optional.of(backoffStrategy),
+        Optional.of(0L)));
+
+    return result;
+  }
+
+  @Override
+  protected void run() throws Exception {
+    while (isRunning()) {
+      List<WorkItem<T>> batch = new LinkedList<>();
+      batch.add(workQueue.take());
+      workQueue.drainTo(batch, maxBatchSize - batch.size());
+      processBatch(batch);
+    }
+  }
+
+  private void processBatch(List<WorkItem<T>> batch) {
+    if (!batch.isEmpty()) {
+      long unlockedStart = System.nanoTime();
+      storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
+        long lockedStart = System.nanoTime();
+        for (WorkItem<T> item : batch) {
+          try {
+            Result<T> itemResult = item.work.apply(storeProvider);
+            if (itemResult.isCompleted) {
+              item.result.complete(itemResult.value);
+            } else {
+              // Work not finished yet - re-queue for a followup later.
+              long backoffMsec = backoffFor(item);
+              scheduledExecutor.schedule(
+                  () -> workQueue.add(new WorkItem<>(
+                      item.work,
+                      item.result,
+                      item.backoffStrategy,
+                      Optional.of(backoffMsec))),
+                  backoffMsec,
+                  TimeUnit.MILLISECONDS);
+            }
+          } catch (RuntimeException e) {
+            LOG.error("{}: Failed to process batch item. Error: {}", serviceName(), e);
+            item.result.completeExceptionally(e);
+          }
+        }
+        batchLocked.accumulate(System.nanoTime() - lockedStart);
+      });
+      batchUnlocked.accumulate(System.nanoTime() - unlockedStart);
+      batchesProcessed.incrementAndGet();
+      lastBatchSize.set(batch.size());
+      itemsProcessed.addAndGet(batch.size());
+    }
+  }
+
+  private long backoffFor(WorkItem<T> item) {
+    checkState(item.backoffStrategy.isPresent());
+    checkState(item.lastBackoffMsec.isPresent());
+    return item.backoffStrategy.get().calculateBackoffMs(item.lastBackoffMsec.get());
+  }
+
+  private class WorkItem<V> {
+    private final RepeatableWork<V> work;
+    private final CompletableFuture<T> result;
+    private final Optional<BackoffStrategy> backoffStrategy;
+    private final Optional<Long> lastBackoffMsec;
+
+    WorkItem(
+        RepeatableWork<V> work,
+        CompletableFuture<T> result,
+        Optional<BackoffStrategy> backoffStrategy,
+        Optional<Long> lastBackoffMsec) {
+
+      this.work = work;
+      this.result = result;
+      this.backoffStrategy = backoffStrategy;
+      this.lastBackoffMsec = lastBackoffMsec;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 4a7ef0b..2ec3967 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -16,6 +16,8 @@ package org.apache.aurora.scheduler;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+
+import javax.inject.Inject;
 import javax.inject.Singleton;
 
 import com.google.inject.AbstractModule;
@@ -27,10 +29,13 @@ import org.apache.aurora.common.args.CmdLine;
 import org.apache.aurora.common.args.constraints.Positive;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.BatchWorker.NoResult;
 import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,6 +64,11 @@ public class SchedulerModule extends AbstractModule {
       help = "The maximum number of status updates that can be processed in a batch.")
   private static final Arg<Integer> MAX_STATUS_UPDATE_BATCH_SIZE = Arg.create(1000);
 
+  @Positive
+  @CmdLine(name = "max_task_event_batch_size",
+      help = "The maximum number of task state change events that can be processed in a batch.")
+  private static final Arg<Integer> MAX_TASK_EVENT_BATCH_SIZE = Arg.create(300);
+
   @Override
   protected void configure() {
     bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
@@ -93,6 +103,21 @@ public class SchedulerModule extends AbstractModule {
     bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class);
     bind(TaskStatusHandlerImpl.class).in(Singleton.class);
     addSchedulerActiveServiceBinding(binder()).to(TaskStatusHandlerImpl.class);
+
+    bind(TaskEventBatchWorker.class).in(Singleton.class);
+    addSchedulerActiveServiceBinding(binder()).to(TaskEventBatchWorker.class);
   }
 
+  public static class TaskEventBatchWorker extends BatchWorker<NoResult> {
+    @Inject
+    TaskEventBatchWorker(Storage storage, StatsProvider statsProvider) {
+
+      super(storage, statsProvider, MAX_TASK_EVENT_BATCH_SIZE.get());
+    }
+
+    @Override
+    protected String serviceName() {
+      return "TaskEventBatchWorker";
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index f07746c..c672826 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -29,13 +29,14 @@ import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.slf4j.Logger;
@@ -62,6 +63,7 @@ public class TaskHistoryPruner implements EventSubscriber {
   private final HistoryPrunnerSettings settings;
   private final Storage storage;
   private final Lifecycle lifecycle;
+  private final TaskEventBatchWorker batchWorker;
 
   private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
     @Override
@@ -94,7 +96,8 @@ public class TaskHistoryPruner implements EventSubscriber {
       Clock clock,
       HistoryPrunnerSettings settings,
       Storage storage,
-      Lifecycle lifecycle) {
+      Lifecycle lifecycle,
+      TaskEventBatchWorker batchWorker) {
 
     this.executor = requireNonNull(executor);
     this.stateManager = requireNonNull(stateManager);
@@ -102,6 +105,7 @@ public class TaskHistoryPruner implements EventSubscriber {
     this.settings = requireNonNull(settings);
     this.storage = requireNonNull(storage);
     this.lifecycle = requireNonNull(lifecycle);
+    this.batchWorker = requireNonNull(batchWorker);
   }
 
   @VisibleForTesting
@@ -131,8 +135,10 @@ public class TaskHistoryPruner implements EventSubscriber {
 
   private void deleteTasks(final Set<String> taskIds) {
     LOG.info("Pruning inactive tasks " + taskIds);
-    storage.write(
-        (NoResult.Quiet) storeProvider -> stateManager.deleteTasks(storeProvider, taskIds));
+    batchWorker.execute(storeProvider -> {
+      stateManager.deleteTasks(storeProvider, taskIds);
+      return BatchWorker.NO_RESULT;
+    });
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
index bbd971a..867c9bd 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
@@ -22,13 +22,14 @@ import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.SlidingStats;
 import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
 
 import static java.util.Objects.requireNonNull;
 
@@ -46,8 +47,8 @@ class TaskThrottler implements EventSubscriber {
   private final RescheduleCalculator rescheduleCalculator;
   private final Clock clock;
   private final DelayExecutor executor;
-  private final Storage storage;
   private final StateManager stateManager;
+  private final TaskEventBatchWorker batchWorker;
 
   private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms");
 
@@ -56,14 +57,14 @@ class TaskThrottler implements EventSubscriber {
       RescheduleCalculator rescheduleCalculator,
       Clock clock,
       @AsyncExecutor DelayExecutor executor,
-      Storage storage,
-      StateManager stateManager) {
+      StateManager stateManager,
+      TaskEventBatchWorker batchWorker) {
 
     this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
     this.clock = requireNonNull(clock);
     this.executor = requireNonNull(executor);
-    this.storage = requireNonNull(storage);
     this.stateManager = requireNonNull(stateManager);
+    this.batchWorker = requireNonNull(batchWorker);
   }
 
   @Subscribe
@@ -73,13 +74,16 @@ class TaskThrottler implements EventSubscriber {
           + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
       long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
       throttleStats.accumulate(delayMs);
-      executor.execute(
-          () -> storage.write(storeProvider -> stateManager.changeState(
-              storeProvider,
-              stateChange.getTaskId(),
-              Optional.of(THROTTLED),
-              PENDING,
-              Optional.absent())),
+      executor.execute(() ->
+              batchWorker.execute(storeProvider -> {
+                stateManager.changeState(
+                    storeProvider,
+                    stateChange.getTaskId(),
+                    Optional.of(THROTTLED),
+                    PENDING,
+                    Optional.absent());
+                return BatchWorker.NO_RESULT;
+              }),
           Amount.of(delayMs, Time.MILLISECONDS));
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 3c7cda0..574efc9 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -30,6 +30,8 @@ import com.google.common.eventbus.Subscribe;
 import org.apache.aurora.gen.HostStatus;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -37,7 +39,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IHostStatus;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -106,11 +107,17 @@ public interface MaintenanceController {
     private static final Logger LOG = LoggerFactory.getLogger(MaintenanceControllerImpl.class);
     private final Storage storage;
     private final StateManager stateManager;
+    private final TaskEventBatchWorker batchWorker;
 
     @Inject
-    public MaintenanceControllerImpl(Storage storage, StateManager stateManager) {
+    public MaintenanceControllerImpl(
+        Storage storage,
+        StateManager stateManager,
+        TaskEventBatchWorker batchWorker) {
+
       this.storage = requireNonNull(storage);
       this.stateManager = requireNonNull(stateManager);
+      this.batchWorker = requireNonNull(batchWorker);
     }
 
     private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
@@ -153,7 +160,7 @@ public interface MaintenanceController {
     public void taskChangedState(final TaskStateChange change) {
       if (Tasks.isTerminated(change.getNewState())) {
         final String host = change.getTask().getAssignedTask().getSlaveHost();
-        storage.write((NoResult.Quiet) (MutableStoreProvider store) -> {
+        batchWorker.execute(store -> {
           // If the task _was_ associated with a draining host, and it was the last task on the
           // host.
           Optional<IHostAttributes> attributes =
@@ -168,6 +175,7 @@ public interface MaintenanceController {
               LOG.info("Host {} is DRAINING with active tasks: {}", host, Tasks.ids(activeTasks));
             }
           }
+          return BatchWorker.NO_RESULT;
         });
       }
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index ef6253e..25b3f37 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -44,6 +44,8 @@ import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.LockKey;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
@@ -120,6 +122,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   private final Clock clock;
   private final PulseHandler pulseHandler;
   private final Lifecycle lifecycle;
+  private final TaskEventBatchWorker batchWorker;
 
   // Currently-active updaters. An active updater is one that is rolling forward or back. Paused
   // and completed updates are represented only in storage, not here.
@@ -134,7 +137,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
       ScheduledExecutorService executor,
       StateManager stateManager,
       Clock clock,
-      Lifecycle lifecycle) {
+      Lifecycle lifecycle,
+      TaskEventBatchWorker batchWorker) {
 
     this.updateFactory = requireNonNull(updateFactory);
     this.lockManager = requireNonNull(lockManager);
@@ -143,6 +147,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
     this.stateManager = requireNonNull(stateManager);
     this.clock = requireNonNull(clock);
     this.lifecycle = requireNonNull(lifecycle);
+    this.batchWorker = requireNonNull(batchWorker);
     this.pulseHandler = new PulseHandler(clock);
   }
 
@@ -346,7 +351,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   }
 
   private void instanceChanged(final IInstanceKey instance, final Optional<IScheduledTask> state) {
-    storage.write((NoResult.Quiet) storeProvider -> {
+    batchWorker.execute(storeProvider -> {
       IJobKey job = instance.getJobKey();
       UpdateFactory.Update update = updates.get(job);
       if (update != null) {
@@ -366,6 +371,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
               + JobKeys.canonicalString(job));
         }
       }
+      return BatchWorker.NO_RESULT;
     });
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
new file mode 100644
index 0000000..a86dc82
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.scheduler.BatchWorker.Result;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertTrue;
+
+public class BatchWorkerTest extends EasyMockTest {
+  private static final String SERVICE_NAME = "TestWorker";
+  private static final String BATCH_STAT = SERVICE_NAME + "_batches_processed";
+  private FakeStatsProvider statsProvider;
+  private BatchWorker<Boolean> batchWorker;
+
+  @Before
+  public void setUp() {
+    StorageTestUtil storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    statsProvider = new FakeStatsProvider();
+    batchWorker = new BatchWorker<Boolean>(storageUtil.storage, statsProvider, 2) {
+      @Override
+      protected String serviceName() {
+        return SERVICE_NAME;
+      }
+    };
+  }
+
+  @Test
+  public void testExecute() throws Exception {
+    control.replay();
+
+    CompletableFuture<Boolean> result1 = batchWorker.execute(store -> true);
+    CompletableFuture<Boolean> result2 = batchWorker.execute(store -> true);
+    CompletableFuture<Boolean> result3 = batchWorker.execute(store -> true);
+    batchWorker.startAsync().awaitRunning();
+
+    assertTrue(result1.get());
+    assertTrue(result2.get());
+    assertTrue(result3.get());
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testExecuteThrows() throws Exception {
+    control.replay();
+
+    CompletableFuture<Boolean> result =
+        batchWorker.execute(store -> { throw new IllegalArgumentException(); });
+    batchWorker.startAsync().awaitRunning();
+
+    result.get();
+  }
+
+  @Test
+  public void testExecuteWithReplay() throws Exception {
+    BackoffStrategy backoff = createMock(BackoffStrategy.class);
+    final CountDownLatch complete = new CountDownLatch(1);
+
+    expect(backoff.calculateBackoffMs(EasyMock.anyLong())).andReturn(0L).anyTimes();
+
+    control.replay();
+
+    batchWorker.startAsync().awaitRunning();
+    batchWorker.executeWithReplay(
+        backoff,
+        store -> statsProvider.getValue(BATCH_STAT).longValue() > 1L
+            ? new Result<>(true, true)
+            : new Result<>(false, false))
+        .thenAccept(result -> complete.countDown());
+
+    assertTrue(complete.await(10L, TimeUnit.SECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 99c27e8..8469596 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -27,6 +27,7 @@ import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
@@ -48,6 +49,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expectLastCall;
 
@@ -68,20 +70,24 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
   private Command shutdownCommand;
 
   @Before
-  public void setUp() {
+  public void setUp() throws Exception {
     executor = createMock(DelayExecutor.class);
     clock = new FakeClock();
     stateManager = createMock(StateManager.class);
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     shutdownCommand = createMock(Command.class);
+    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+    expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
+
     pruner = new TaskHistoryPruner(
         executor,
         stateManager,
         clock,
         new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
         storageUtil.storage,
-        new Lifecycle(shutdownCommand));
+        new Lifecycle(shutdownCommand),
+        batchWorker);
     closer = Closer.create();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
index 7d104aa..433f791 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
@@ -24,6 +24,7 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -39,6 +40,7 @@ import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -60,12 +62,15 @@ public class TaskThrottlerTest extends EasyMockTest {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
+    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+    expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
+
     throttler = new TaskThrottler(
         rescheduleCalculator,
         clock,
         executor,
-        storageUtil.storage,
-        stateManager);
+        stateManager,
+        batchWorker);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index 94f5ca5..ae83dea 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -30,6 +30,7 @@ import org.apache.aurora.gen.HostStatus;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
@@ -53,6 +54,7 @@ import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
@@ -71,6 +73,8 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
+    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+    expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
 
     Injector injector = Guice.createInjector(
         new PubsubEventModule(),
@@ -83,6 +87,7 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
             bind(StatsProvider.class).toInstance(new FakeStatsProvider());
             bind(Executor.class).annotatedWith(AsyncExecutor.class)
                 .toInstance(MoreExecutors.directExecutor());
+            bind(TaskEventBatchWorker.class).toInstance(batchWorker);
           }
         });
     maintenance = injector.getInstance(MaintenanceController.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java b/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java
new file mode 100644
index 0000000..46b2e36
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.testing;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.Work;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.easymock.Capture;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+
+import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+
+public final class BatchWorkerUtil {
+  private BatchWorkerUtil() {
+    // Utility class.
+  }
+
+  public static <T> IExpectationSetters<CompletableFuture<T>> expectBatchExecute(
+      BatchWorker<T> batchWorker,
+      Storage storage,
+      IMocksControl control,
+      T resultValue) throws Exception {
+
+    final CompletableFuture<T> result = new EasyMockTest.Clazz<CompletableFuture<T>>() { }
+        .createMock(control);
+    expect(result.get()).andReturn(resultValue).anyTimes();
+
+    final Capture<Work<T>> capture = createCapture();
+    return expect(batchWorker.execute(capture(capture))).andAnswer(() -> {
+      storage.write((Storage.MutateWork.NoResult.Quiet) store -> capture.getValue().apply(store));
+      return result;
+    });
+  }
+
+  public static <T> IExpectationSetters<CompletableFuture<T>> expectBatchExecute(
+      BatchWorker<T> batchWorker,
+      Storage storage,
+      IMocksControl control) throws Exception {
+
+    return expectBatchExecute(batchWorker, storage, control, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index f879827..ea0b89a 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -60,6 +60,7 @@ import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import org.apache.aurora.scheduler.base.JobKeys;
@@ -125,6 +126,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
 import static org.apache.aurora.scheduler.updater.UpdateFactory.UpdateFactoryImpl.expandInstanceIds;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
@@ -164,7 +166,7 @@ public class JobUpdaterIT extends EasyMockTest {
   }
 
   @Before
-  public void setUp() {
+  public void setUp() throws Exception {
     // Avoid console spam due to stats registered multiple times.
     Stats.flush();
     ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
@@ -172,6 +174,7 @@ public class JobUpdaterIT extends EasyMockTest {
     driver = createMock(Driver.class);
     shutdownCommand = createMock(Command.class);
     eventBus = new EventBus();
+    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
 
     Injector injector = Guice.createInjector(
         new UpdaterModule(executor),
@@ -195,6 +198,7 @@ public class JobUpdaterIT extends EasyMockTest {
             bind(LockManager.class).to(LockManagerImpl.class);
             bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
             bind(Lifecycle.class).toInstance(new Lifecycle(shutdownCommand));
+            bind(TaskEventBatchWorker.class).toInstance(batchWorker);
           }
         });
     updater = injector.getInstance(JobUpdateController.class);
@@ -204,6 +208,7 @@ public class JobUpdaterIT extends EasyMockTest {
     stateManager = injector.getInstance(StateManager.class);
     eventBus.register(injector.getInstance(JobUpdateEventSubscriber.class));
     subscriber = injector.getInstance(JobUpdateEventSubscriber.class);
+    expectBatchExecute(batchWorker, storage, control).anyTimes();
   }
 
   @After


[2/3] aurora git commit: Batching writes - Part 2 (of 3): Converting cron jobs to use BatchWorker.

Posted by ma...@apache.org.
Batching writes - Part 2 (of 3): Converting cron jobs to use BatchWorker.

Reviewed at https://reviews.apache.org/r/51763/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2cb43d61
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2cb43d61
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2cb43d61

Branch: refs/heads/master
Commit: 2cb43d61ecafb79b31d36332ef4713b9857b3c1a
Parents: ebfeb3e
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:26 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:26 2016 -0700

----------------------------------------------------------------------
 .../aurora/common/util/BackoffHelper.java       |   8 +
 .../aurora/common/util/BackoffHelperTest.java   |   7 +
 .../scheduler/cron/quartz/AuroraCronJob.java    | 239 +++++++++++--------
 .../scheduler/cron/quartz/CronModule.java       |  25 +-
 .../cron/quartz/AuroraCronJobTest.java          | 107 ++++++---
 .../aurora/scheduler/cron/quartz/CronIT.java    |   4 +
 6 files changed, 256 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
index 8e73dd9..517c0ef 100644
--- a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
+++ b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
@@ -90,6 +90,14 @@ public class BackoffHelper {
   }
 
   /**
+   * Gets {@link BackoffStrategy} instance the BackoffHelper is initialized with.
+   * @return instance of {@link BackoffStrategy} used by BackoffHelper.
+   */
+  public BackoffStrategy getBackoffStrategy() {
+    return backoffStrategy;
+  }
+
+  /**
    * Executes the given task using the configured backoff strategy until the task succeeds as
    * indicated by returning a non-null value.
    *

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
index bc30990..012fbac 100644
--- a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
@@ -41,6 +41,13 @@ public class BackoffHelperTest extends EasyMockTest {
   }
 
   @Test
+  public void testGetBackoffStrategy() {
+    control.replay();
+
+    assertEquals(backoffStrategy, backoffHelper.getBackoffStrategy());
+  }
+
+  @Test
   public void testDoUntilSuccess() throws Exception {
     ExceptionalSupplier<Boolean, RuntimeException> task =
         createMock(new Clazz<ExceptionalSupplier<Boolean, RuntimeException>>() { });

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index c07551e..7c8047a 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -13,29 +13,37 @@
  */
 package org.apache.aurora.scheduler.cron.quartz;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
 import java.util.Date;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.BackoffHelper;
 import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.NoResult;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.cron.CronException;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -43,9 +51,14 @@ import org.quartz.DisallowConcurrentExecution;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.quartz.PersistJobDataAfterExecution;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -61,7 +74,8 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLING;
  * scheduler should therefore be configured with a large number of threads.
  */
 @DisallowConcurrentExecution
-class AuroraCronJob implements Job {
+@PersistJobDataAfterExecution
+class AuroraCronJob implements Job, EventSubscriber {
   private static final Logger LOG = LoggerFactory.getLogger(AuroraCronJob.class);
 
   private static final AtomicLong CRON_JOB_TRIGGERS = Stats.exportLong("cron_job_triggers");
@@ -69,147 +83,166 @@ class AuroraCronJob implements Job {
   private static final AtomicLong CRON_JOB_PARSE_FAILURES =
       Stats.exportLong("cron_job_parse_failures");
   private static final AtomicLong CRON_JOB_COLLISIONS = Stats.exportLong("cron_job_collisions");
+  private static final AtomicLong CRON_JOB_CONCURRENT_RUNS =
+      Stats.exportLong("cron_job_concurrent_runs");
 
   @VisibleForTesting
   static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cronScheduler");
 
   private final ConfigurationManager configurationManager;
-  private final Storage storage;
   private final StateManager stateManager;
   private final BackoffHelper delayedStartBackoff;
+  private final BatchWorker<NoResult> batchWorker;
+  private final Set<IJobKey> killFollowups = Sets.newConcurrentHashSet();
+
+  /**
+   * Annotation for the max cron batch size.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface CronMaxBatchSize { }
+
+  static class CronBatchWorker extends BatchWorker<NoResult> {
+    @Inject
+    CronBatchWorker(
+        Storage storage,
+        StatsProvider statsProvider,
+        @CronMaxBatchSize int maxBatchSize) {
+
+      super(storage, statsProvider, maxBatchSize);
+    }
+
+    @Override
+    protected String serviceName() {
+      return "CronBatchWorker";
+    }
+  }
 
   @Inject
   AuroraCronJob(
       ConfigurationManager configurationManager,
       Config config,
-      Storage storage,
-      StateManager stateManager) {
+      StateManager stateManager,
+      CronBatchWorker batchWorker) {
 
     this.configurationManager = requireNonNull(configurationManager);
-    this.storage = requireNonNull(storage);
     this.stateManager = requireNonNull(stateManager);
+    this.batchWorker = requireNonNull(batchWorker);
     this.delayedStartBackoff = requireNonNull(config.getDelayedStartBackoff());
   }
 
-  private static final class DeferredLaunch {
-    private final ITaskConfig task;
-    private final Set<Integer> instanceIds;
-    private final Set<String> activeTaskIds;
-
-    DeferredLaunch(ITaskConfig task, Set<Integer> instanceIds, Set<String> activeTaskIds) {
-      this.task = task;
-      this.instanceIds = instanceIds;
-      this.activeTaskIds = activeTaskIds;
-    }
-  }
-
   @Override
   public void execute(JobExecutionContext context) throws JobExecutionException {
     // We assume quartz prevents concurrent runs of this job for a given job key. This allows us
     // to avoid races where we might kill another run's tasks.
     checkState(context.getJobDetail().isConcurrentExectionDisallowed());
 
-    doExecute(Quartz.auroraJobKey(context.getJobDetail().getKey()));
+    doExecute(context);
   }
 
   @VisibleForTesting
-  void doExecute(final IJobKey key) throws JobExecutionException {
+  void doExecute(JobExecutionContext context) throws JobExecutionException {
+    final IJobKey key = Quartz.auroraJobKey(context.getJobDetail().getKey());
     final String path = JobKeys.canonicalString(key);
 
-    final Optional<DeferredLaunch> deferredLaunch = storage.write(
-        (MutateWork.Quiet<Optional<DeferredLaunch>>) storeProvider -> {
-          Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key);
-          if (!config.isPresent()) {
-            LOG.warn(
-                "Cron was triggered for {} but no job with that key was found in storage.",
-                path);
-            CRON_JOB_MISFIRES.incrementAndGet();
-            return Optional.absent();
-          }
-
-          SanitizedCronJob cronJob;
-          try {
-            cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get());
-          } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
-            LOG.warn(
-                "Invalid cron job for {} in storage - failed to parse with {}", key, e);
-            CRON_JOB_PARSE_FAILURES.incrementAndGet();
-            return Optional.absent();
-          }
-
-          CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
-          LOG.info(
-              "Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy);
-          CRON_JOB_TRIGGERS.incrementAndGet();
-
-          final Query.Builder activeQuery = Query.jobScoped(key).active();
-          Set<String> activeTasks =
-              Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
+    // Prevent a concurrent run for this job in case a previous trigger took longer to run.
+    // This approach relies on saving the "work in progress" token within the job context itself
+    // (see below) and relying on killFollowups to signal "work completion".
+    if (context.getJobDetail().getJobDataMap().containsKey(path)) {
+      CRON_JOB_CONCURRENT_RUNS.incrementAndGet();
+      if (killFollowups.contains(key)) {
+        context.getJobDetail().getJobDataMap().remove(path);
+        killFollowups.remove(key);
+        LOG.info("Resetting job context for cron " + path);
+      } else {
+        LOG.info("Ignoring trigger as another concurrent run is active for cron " + path);
+        return;
+      }
+    }
 
-          ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
-          Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
-          if (activeTasks.isEmpty()) {
-            stateManager.insertPendingTasks(storeProvider, task, instanceIds);
+    CompletableFuture<NoResult> scheduleResult = batchWorker.<NoResult>execute(storeProvider -> {
+      Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key);
+      if (!config.isPresent()) {
+        LOG.warn("Cron was triggered for {} but no job with that key was found in storage.", path);
+        CRON_JOB_MISFIRES.incrementAndGet();
+        return BatchWorker.NO_RESULT;
+      }
 
-            return Optional.absent();
-          }
+      SanitizedCronJob cronJob;
+      try {
+        cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get());
+      } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
+        LOG.warn("Invalid cron job for {} in storage - failed to parse with {}", key, e);
+        CRON_JOB_PARSE_FAILURES.incrementAndGet();
+        return BatchWorker.NO_RESULT;
+      }
 
-          CRON_JOB_COLLISIONS.incrementAndGet();
-          switch (collisionPolicy) {
-            case KILL_EXISTING:
-              return Optional.of(new DeferredLaunch(task, instanceIds, activeTasks));
+      CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
+      LOG.info("Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy);
+      CRON_JOB_TRIGGERS.incrementAndGet();
 
-            case RUN_OVERLAP:
-              LOG.error("Ignoring trigger for job {} with deprecated collision"
-                  + "policy RUN_OVERLAP due to unterminated active tasks.", path);
-              return Optional.absent();
+      final Query.Builder activeQuery = Query.jobScoped(key).active();
+      Set<String> activeTasks = Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
 
-            case CANCEL_NEW:
-              return Optional.absent();
+      ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
+      Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
+      if (activeTasks.isEmpty()) {
+        stateManager.insertPendingTasks(storeProvider, task, instanceIds);
+        return BatchWorker.NO_RESULT;
+      }
 
-            default:
-              LOG.error("Unrecognized cron collision policy: " + collisionPolicy);
-              return Optional.absent();
+      CRON_JOB_COLLISIONS.incrementAndGet();
+      switch (collisionPolicy) {
+        case KILL_EXISTING:
+          for (String taskId : activeTasks) {
+            stateManager.changeState(
+                storeProvider,
+                taskId,
+                Optional.absent(),
+                KILLING,
+                KILL_AUDIT_MESSAGE);
           }
-        }
-    );
 
-    if (!deferredLaunch.isPresent()) {
-      return;
-    }
-
-    storage.write((NoResult.Quiet) storeProvider -> {
-      for (String taskId : deferredLaunch.get().activeTaskIds) {
-        stateManager.changeState(
-            storeProvider,
-            taskId,
-            Optional.absent(),
-            KILLING,
-            KILL_AUDIT_MESSAGE);
+          LOG.info("Waiting for job to terminate before launching cron job " + path);
+          // Use job detail map to signal a "work in progress" condition to subsequent triggers.
+          context.getJobDetail().getJobDataMap().put(path, null);
+          batchWorker.executeWithReplay(
+              delayedStartBackoff.getBackoffStrategy(),
+              store -> {
+                Query.Builder query = Query.taskScoped(activeTasks).active();
+                if (Iterables.isEmpty(storeProvider.getTaskStore().fetchTasks(query))) {
+                  LOG.info("Initiating delayed launch of cron " + path);
+                  stateManager.insertPendingTasks(store, task, instanceIds);
+                  return new BatchWorker.Result<>(true, null);
+                } else {
+                  LOG.info("Not yet safe to run cron " + path);
+                  return new BatchWorker.Result<>(false, null);
+                }
+              })
+              .thenAccept(ignored -> {
+                killFollowups.add(key);
+                LOG.info("Finished delayed launch for cron " + path);
+              });
+          break;
+
+        case RUN_OVERLAP:
+          LOG.error("Ignoring trigger for job {} with deprecated collision"
+              + "policy RUN_OVERLAP due to unterminated active tasks.", path);
+          break;
+
+        case CANCEL_NEW:
+          break;
+
+        default:
+          LOG.error("Unrecognized cron collision policy: " + collisionPolicy);
       }
+      return BatchWorker.NO_RESULT;
     });
 
-    LOG.info("Waiting for job to terminate before launching cron job {}.", path);
-
-    final Query.Builder query = Query.taskScoped(deferredLaunch.get().activeTaskIds).active();
     try {
-      // NOTE: We block the quartz execution thread here until we've successfully killed our
-      // ancestor. We mitigate this by using a cached thread pool for quartz.
-      delayedStartBackoff.doUntilSuccess(() -> {
-        if (Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) {
-          LOG.info("Initiating delayed launch of cron " + path);
-          storage.write((NoResult.Quiet) storeProvider -> stateManager.insertPendingTasks(
-              storeProvider,
-              deferredLaunch.get().task,
-              deferredLaunch.get().instanceIds));
-
-          return true;
-        } else {
-          LOG.info("Not yet safe to run cron " + path);
-          return false;
-        }
-      });
-    } catch (InterruptedException e) {
+      scheduleResult.get();
+    } catch (ExecutionException | InterruptedException e) {
       LOG.warn("Interrupted while trying to launch cron " + path, e);
       Thread.currentThread().interrupt();
       throw new JobExecutionException(e);

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
index 155d702..9c88a2a 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
@@ -21,16 +21,19 @@ import javax.inject.Singleton;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.common.args.Arg;
 import org.apache.aurora.common.args.CmdLine;
+import org.apache.aurora.common.args.constraints.Positive;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
 import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.impl.StdSchedulerFactory;
@@ -38,6 +41,7 @@ import org.quartz.simpl.SimpleThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding;
 import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_ID;
 import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON;
 import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_NAME;
@@ -55,7 +59,7 @@ public class CronModule extends AbstractModule {
 
   @CmdLine(name = "cron_scheduler_num_threads",
       help = "Number of threads to use for the cron scheduler thread pool.")
-  private static final Arg<Integer> NUM_THREADS = Arg.create(100);
+  private static final Arg<Integer> NUM_THREADS = Arg.create(10);
 
   @CmdLine(name = "cron_timezone", help = "TimeZone to use for cron predictions.")
   private static final Arg<String> CRON_TIMEZONE = Arg.create("GMT");
@@ -63,13 +67,18 @@ public class CronModule extends AbstractModule {
   @CmdLine(name = "cron_start_initial_backoff", help =
       "Initial backoff delay while waiting for a previous cron run to be killed.")
   public static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
-      Arg.create(Amount.of(1L, Time.SECONDS));
+      Arg.create(Amount.of(5L, Time.SECONDS));
 
   @CmdLine(name = "cron_start_max_backoff", help =
       "Max backoff delay while waiting for a previous cron run to be killed.")
   public static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
       Arg.create(Amount.of(1L, Time.MINUTES));
 
+  @Positive
+  @CmdLine(name = "cron_scheduling_max_batch_size",
+      help = "The maximum number of triggered cron jobs that can be processed in a batch.")
+  private static final Arg<Integer> CRON_MAX_BATCH_SIZE = Arg.create(10);
+
   // Global per-JVM ID number generator for the provided Quartz Scheduler.
   private static final AtomicLong ID_GENERATOR = new AtomicLong();
 
@@ -90,8 +99,16 @@ public class CronModule extends AbstractModule {
     bind(AuroraCronJob.Config.class).toInstance(new AuroraCronJob.Config(
         new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get())));
 
+    PubsubEventModule.bindSubscriber(binder(), AuroraCronJob.class);
+
     bind(CronLifecycle.class).in(Singleton.class);
-    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
+    addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
+
+    bind(new TypeLiteral<Integer>() { })
+        .annotatedWith(AuroraCronJob.CronMaxBatchSize.class)
+        .toInstance(CRON_MAX_BATCH_SIZE.get());
+    bind(CronBatchWorker.class).in(Singleton.class);
+    addSchedulerActiveServiceBinding(binder()).to(CronBatchWorker.class);
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index 5c64ff2..fb06c28 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -13,17 +13,23 @@
  */
 package org.apache.aurora.scheduler.cron.quartz;
 
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
-import org.apache.aurora.common.base.ExceptionalSupplier;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.BackoffHelper;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.CronCollisionPolicy;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.RepeatableWork;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -31,11 +37,17 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.easymock.Capture;
-import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.quartz.impl.JobDetailImpl;
 
+import static org.apache.aurora.scheduler.cron.quartz.QuartzTestUtil.AURORA_JOB_KEY;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
@@ -43,50 +55,61 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class AuroraCronJobTest extends EasyMockTest {
-  public static final String TASK_ID = "A";
+  private static final String TASK_ID = "A";
+  private JobDetailImpl jobDetails;
   private Storage storage;
   private StateManager stateManager;
   private BackoffHelper backoffHelper;
-
+  private CronBatchWorker batchWorker;
+  private JobExecutionContext context;
   private AuroraCronJob auroraCronJob;
 
   @Before
-  public void setUp() {
+  public void setUp() throws Exception {
     storage = DbUtil.createStorage();
     stateManager = createMock(StateManager.class);
     backoffHelper = createMock(BackoffHelper.class);
+    context = createMock(JobExecutionContext.class);
+
+    jobDetails = new JobDetailImpl();
+    jobDetails.setKey(Quartz.jobKey(AURORA_JOB_KEY));
+    jobDetails.setJobDataMap(new JobDataMap(new HashMap()));
+    expect(context.getJobDetail()).andReturn(jobDetails).anyTimes();
+
+    batchWorker = createMock(CronBatchWorker.class);
+    expectBatchExecute(batchWorker, storage, control).anyTimes();
 
     auroraCronJob = new AuroraCronJob(
         TaskTestUtil.CONFIGURATION_MANAGER,
-        new AuroraCronJob.Config(backoffHelper), storage, stateManager);
+        new AuroraCronJob.Config(backoffHelper),
+        stateManager,
+        batchWorker);
   }
 
   @Test
   public void testExecuteNonexistentIsNoop() throws JobExecutionException {
     control.replay();
 
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+    auroraCronJob.doExecute(context);
   }
 
   @Test
   public void testEmptyStorage() throws JobExecutionException {
-    stateManager.insertPendingTasks(
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject());
+    stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject());
     expectLastCall().times(3);
 
     control.replay();
+
     populateStorage(CronCollisionPolicy.CANCEL_NEW);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-    storage = DbUtil.createStorage();
+    auroraCronJob.doExecute(context);
 
-    populateStorage(CronCollisionPolicy.KILL_EXISTING);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
     storage = DbUtil.createStorage();
+    populateStorage(CronCollisionPolicy.KILL_EXISTING);
+    auroraCronJob.doExecute(context);
 
+    storage = DbUtil.createStorage();
     populateStorage(CronCollisionPolicy.RUN_OVERLAP);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+    auroraCronJob.doExecute(context);
   }
 
   @Test
@@ -95,35 +118,65 @@ public class AuroraCronJobTest extends EasyMockTest {
 
     populateTaskStore();
     populateStorage(CronCollisionPolicy.CANCEL_NEW);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+    auroraCronJob.doExecute(context);
+  }
+
+  @Test
+  public void testOverlap() throws JobExecutionException {
+    control.replay();
+
+    populateTaskStore();
+    populateStorage(CronCollisionPolicy.RUN_OVERLAP);
+    auroraCronJob.doExecute(context);
   }
 
   @Test
   public void testKillExisting() throws Exception {
-    Capture<ExceptionalSupplier<Boolean, RuntimeException>> capture = createCapture();
+    Capture<RepeatableWork<BatchWorker.NoResult>> killCapture = createCapture();
+    CompletableFuture<BatchWorker.NoResult> killResult = new CompletableFuture<>();
+    expect(batchWorker.executeWithReplay(anyObject(), capture(killCapture))).andReturn(killResult);
 
+    expect(backoffHelper.getBackoffStrategy()).andReturn(null).anyTimes();
     expect(stateManager.changeState(
-        EasyMock.anyObject(),
+        anyObject(),
         eq(TASK_ID),
         eq(Optional.absent()),
         eq(ScheduleStatus.KILLING),
         eq(AuroraCronJob.KILL_AUDIT_MESSAGE)))
         .andReturn(StateChangeResult.SUCCESS);
-    backoffHelper.doUntilSuccess(EasyMock.capture(capture));
-    stateManager.insertPendingTasks(
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject());
+    stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject());
+    expectLastCall().times(2);
 
     control.replay();
 
     populateStorage(CronCollisionPolicy.KILL_EXISTING);
     populateTaskStore();
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-    assertFalse(capture.getValue().get());
+    auroraCronJob.doExecute(context);
+
     storage.write(
         (NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().deleteAllTasks());
-    assertTrue(capture.getValue().get());
+    storage.write((NoResult.Quiet) store -> killCapture.getValue().apply(store));
+
+    // Simulate a trigger in progress.
+    jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null);
+    assertFalse(jobDetails.getJobDataMap().isEmpty());
+
+    // Attempt a concurrent run that must be rejected.
+    auroraCronJob.doExecute(context);
+
+    // Complete previous run and trigger another one.
+    killResult.complete(BatchWorker.NO_RESULT);
+    auroraCronJob.doExecute(context);
+    assertTrue(jobDetails.getJobDataMap().isEmpty());
+  }
+
+  @Test
+  public void testNoConcurrentRun() throws Exception {
+    jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null);
+
+    control.replay();
+
+    auroraCronJob.doExecute(context);
   }
 
   private void populateTaskStore() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index 1c0a3fa..8556253 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -21,6 +21,7 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.util.Modules;
 
+import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.Container;
@@ -34,6 +35,7 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CrontabEntry;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -95,6 +97,8 @@ public class CronIT extends EasyMockTest {
             bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
             bind(StateManager.class).toInstance(stateManager);
             bind(Storage.class).toInstance(storage);
+            bind(StatsProvider.class).toInstance(createMock(StatsProvider.class));
+            bind(EventSink.class).toInstance(createMock(EventSink.class));
           }
         });
   }


[3/3] aurora git commit: Batching writes - Part 3 (of 3): Converting TaskScheduler to use BatchWorker.

Posted by ma...@apache.org.
Batching writes - Part 3 (of 3): Converting TaskScheduler to use BatchWorker.

Reviewed at https://reviews.apache.org/r/51765/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/496397aa
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/496397aa
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/496397aa

Branch: refs/heads/master
Commit: 496397aa5a9534d04bcc273e50a6b9de204ae133
Parents: 2cb43d6
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:48 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:48 2016 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  | 20 ++++---
 .../scheduler/scheduling/SchedulingModule.java  | 13 ++++
 .../aurora/scheduler/scheduling/TaskGroups.java | 62 ++++++++++++++++++--
 .../scheduler/scheduling/TaskScheduler.java     | 17 ++----
 .../scheduler/http/AbstractJettyTest.java       |  2 +
 .../scheduler/scheduling/TaskGroupsTest.java    | 35 +++++++----
 .../scheduling/TaskSchedulerImplTest.java       | 19 +++---
 7 files changed, 125 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 9d0d40b..6f1cbfb 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -190,9 +190,11 @@ public class SchedulingBenchmarks {
     private void fillUpCluster(int numOffers) {
       Set<IScheduledTask> tasksToAssign = buildClusterTasks(numOffers);
       saveTasks(tasksToAssign);
-      for (IScheduledTask scheduledTask : tasksToAssign) {
-        taskScheduler.schedule(scheduledTask.getAssignedTask().getTaskId());
-      }
+      storage.write((NoResult.Quiet) store -> {
+        for (IScheduledTask scheduledTask : tasksToAssign) {
+          taskScheduler.schedule(store, scheduledTask.getAssignedTask().getTaskId());
+        }
+      });
     }
 
     private void saveTasks(final Set<IScheduledTask> tasks) {
@@ -219,11 +221,13 @@ public class SchedulingBenchmarks {
      */
     @Benchmark
     public boolean runBenchmark() {
-      boolean result = false;
-      for (IScheduledTask task : settings.getTasks()) {
-        result = taskScheduler.schedule(task.getAssignedTask().getTaskId());
-      }
-      return result;
+      return storage.write((Storage.MutateWork.Quiet<Boolean>) store -> {
+        boolean result = false;
+        for (IScheduledTask task : settings.getTasks()) {
+          result = taskScheduler.schedule(store, task.getAssignedTask().getTaskId());
+        }
+        return result;
+      });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index 11e8033..664bc6c 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -31,6 +31,8 @@ import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl;
 
+import static org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding;
+
 /**
  * Binding module for task scheduling logic.
  */
@@ -83,6 +85,11 @@ public class SchedulingModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
       Arg.create(Amount.of(3L, Time.MINUTES));
 
+  @Positive
+  @CmdLine(name = "scheduling_max_batch_size",
+      help = "The maximum number of scheduling attempts that can be processed in a batch.")
+  private static final Arg<Integer> SCHEDULING_MAX_BATCH_SIZE = Arg.create(3);
+
   @Override
   protected void configure() {
     install(new PrivateModule() {
@@ -109,6 +116,12 @@ public class SchedulingModule extends AbstractModule {
     });
     PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
 
+    bind(new TypeLiteral<Integer>() { })
+        .annotatedWith(TaskGroups.SchedulingMaxBatchSize.class)
+        .toInstance(SCHEDULING_MAX_BATCH_SIZE.get());
+    bind(TaskGroups.TaskGroupBatchWorker.class).in(Singleton.class);
+    addSchedulerActiveServiceBinding(binder()).to(TaskGroups.TaskGroupBatchWorker.class);
+
     install(new PrivateModule() {
       @Override
       protected void configure() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index c044ebe..d390c07 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -13,12 +13,19 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -28,7 +35,9 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.scheduler.BatchWorker;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -36,9 +45,14 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
@@ -61,12 +75,38 @@ public class TaskGroups implements EventSubscriber {
   private final long firstScheduleDelay;
   private final BackoffStrategy backoff;
   private final RescheduleCalculator rescheduleCalculator;
+  private final BatchWorker<Boolean> batchWorker;
 
   // Track the penalties of tasks at the time they were scheduled. This is to provide data that
   // may influence the selection of a different backoff strategy.
   private final SlidingStats scheduledTaskPenalties =
       new SlidingStats("scheduled_task_penalty", "ms");
 
+  /**
+   * Annotation for the max scheduling batch size.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface SchedulingMaxBatchSize { }
+
+  @VisibleForTesting
+  public static class TaskGroupBatchWorker extends BatchWorker<Boolean> {
+    @Inject
+    TaskGroupBatchWorker(
+        Storage storage,
+        StatsProvider statsProvider,
+        @SchedulingMaxBatchSize int maxBatchSize) {
+
+      super(storage, statsProvider, maxBatchSize);
+    }
+
+    @Override
+    protected String serviceName() {
+      return "TaskGroupBatchWorker";
+    }
+  }
+
   public static class TaskGroupsSettings {
     private final Amount<Long, Time> firstScheduleDelay;
     private final BackoffStrategy taskGroupBackoff;
@@ -88,7 +128,8 @@ public class TaskGroups implements EventSubscriber {
       @AsyncExecutor DelayExecutor executor,
       TaskGroupsSettings settings,
       TaskScheduler taskScheduler,
-      RescheduleCalculator rescheduleCalculator) {
+      RescheduleCalculator rescheduleCalculator,
+      TaskGroupBatchWorker batchWorker) {
 
     requireNonNull(settings.firstScheduleDelay);
     Preconditions.checkArgument(settings.firstScheduleDelay.getValue() > 0);
@@ -99,10 +140,11 @@ public class TaskGroups implements EventSubscriber {
     this.firstScheduleDelay = settings.firstScheduleDelay.as(Time.MILLISECONDS);
     this.backoff = requireNonNull(settings.taskGroupBackoff);
     this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
+    this.batchWorker = requireNonNull(batchWorker);
 
-    this.taskScheduler = taskId -> {
+    this.taskScheduler = (store, taskId) -> {
       settings.rateLimiter.acquire();
-      return taskScheduler.schedule(taskId);
+      return taskScheduler.schedule(store, taskId);
     };
   }
 
@@ -120,10 +162,20 @@ public class TaskGroups implements EventSubscriber {
     Runnable monitor = new Runnable() {
       @Override
       public void run() {
-        Optional<String> taskId = group.peek();
+        final Optional<String> taskId = group.peek();
         long penaltyMs = 0;
         if (taskId.isPresent()) {
-          if (taskScheduler.schedule(taskId.get())) {
+          CompletableFuture<Boolean> result = batchWorker.execute(storeProvider ->
+              taskScheduler.schedule(storeProvider, taskId.get()));
+          boolean isScheduled = false;
+          try {
+            isScheduled = result.get();
+          } catch (ExecutionException | InterruptedException e) {
+            Thread.currentThread().interrupt();
+            Throwables.propagate(e);
+          }
+
+          if (isScheduled) {
             scheduledTaskPenalties.accumulate(group.getPenaltyMs());
             group.remove(taskId.get());
             if (group.hasMore()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index d266f6a..207d38d 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -63,11 +62,12 @@ public interface TaskScheduler extends EventSubscriber {
   /**
    * Attempts to schedule a task, possibly performing irreversible actions.
    *
+   * @param storeProvider {@code MutableStoreProvider} instance to access data store.
    * @param taskId The task to attempt to schedule.
    * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should
    *         call schedule again if {@code false} is returned.
    */
-  boolean schedule(String taskId);
+  boolean schedule(MutableStoreProvider storeProvider, String taskId);
 
   /**
    * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
@@ -86,7 +86,6 @@ public interface TaskScheduler extends EventSubscriber {
 
     private static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerImpl.class);
 
-    private final Storage storage;
     private final TaskAssigner assigner;
     private final Preemptor preemptor;
     private final ExecutorSettings executorSettings;
@@ -98,25 +97,22 @@ public interface TaskScheduler extends EventSubscriber {
 
     @Inject
     TaskSchedulerImpl(
-        Storage storage,
         TaskAssigner assigner,
         Preemptor preemptor,
         ExecutorSettings executorSettings,
         BiCache<String, TaskGroupKey> reservations) {
 
-      this.storage = requireNonNull(storage);
       this.assigner = requireNonNull(assigner);
       this.preemptor = requireNonNull(preemptor);
       this.executorSettings = requireNonNull(executorSettings);
       this.reservations = requireNonNull(reservations);
     }
 
-    @Timed("task_schedule_attempt")
-    @Override
-    public boolean schedule(final String taskId) {
+    @Timed ("task_schedule_attempt")
+    public boolean schedule(MutableStoreProvider store, String taskId) {
       attemptsFired.incrementAndGet();
       try {
-        return storage.write(store -> scheduleTask(store, taskId));
+        return scheduleTask(store, taskId);
       } catch (RuntimeException e) {
         // We catch the generic unchecked exception here to ensure tasks are not abandoned
         // if there is a transient issue resulting in an unchecked exception.
@@ -126,8 +122,7 @@ public interface TaskScheduler extends EventSubscriber {
       }
     }
 
-    @Timed("task_schedule_attempt_locked")
-    protected boolean scheduleTask(MutableStoreProvider store, String taskId) {
+    private boolean scheduleTask(MutableStoreProvider store, String taskId) {
       LOG.debug("Attempting to schedule task " + taskId);
       IAssignedTask assignedTask = Iterables.getOnlyElement(
           Iterables.transform(

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
index c2ceb4e..c1c3eca 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
@@ -54,6 +54,7 @@ import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
+import org.apache.aurora.scheduler.scheduling.TaskGroups;
 import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.scheduling.TaskScheduler;
 import org.apache.aurora.scheduler.state.LockManager;
@@ -128,6 +129,7 @@ public abstract class AbstractJettyTest extends EasyMockTest {
             bindMock(TaskScheduler.class);
             bindMock(TierManager.class);
             bindMock(Thread.UncaughtExceptionHandler.class);
+            bindMock(TaskGroups.TaskGroupBatchWorker.class);
 
             bind(ServletContextListener.class).toProvider(() -> {
               return makeServletContextListener(injector, getChildServletModule());

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
index 95cf25e..8872962 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
@@ -29,15 +29,20 @@ import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupBatchWorker;
 import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 
 public class TaskGroupsTest extends EasyMockTest {
@@ -52,26 +57,33 @@ public class TaskGroupsTest extends EasyMockTest {
   private FakeScheduledExecutor clock;
   private RescheduleCalculator rescheduleCalculator;
   private TaskGroups taskGroups;
+  private TaskGroupBatchWorker batchWorker;
+  private StorageTestUtil storageUtil;
 
   @Before
   public void setUp() throws Exception {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
     DelayExecutor executor = createMock(DelayExecutor.class);
     clock = FakeScheduledExecutor.fromDelayExecutor(executor);
     backoffStrategy = createMock(BackoffStrategy.class);
     taskScheduler = createMock(TaskScheduler.class);
     rateLimiter = createMock(RateLimiter.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
+    batchWorker = createMock(TaskGroupBatchWorker.class);
     taskGroups = new TaskGroups(
         executor,
         new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter),
         taskScheduler,
-        rescheduleCalculator);
+        rescheduleCalculator,
+        batchWorker);
   }
 
   @Test
-  public void testEvaluatedAfterFirstSchedulePenalty() {
+  public void testEvaluatedAfterFirstSchedulePenalty() throws Exception {
     expect(rateLimiter.acquire()).andReturn(0D);
-    expect(taskScheduler.schedule(TASK_A_ID)).andReturn(true);
+    expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
+    expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
 
     control.replay();
 
@@ -80,10 +92,10 @@ public class TaskGroupsTest extends EasyMockTest {
   }
 
   @Test
-  public void testTaskDeletedBeforeEvaluating() {
+  public void testTaskDeletedBeforeEvaluating() throws Exception {
     final IScheduledTask task = makeTask(TASK_A_ID);
     expect(rateLimiter.acquire()).andReturn(0D);
-    expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(() -> {
+    expect(taskScheduler.schedule(anyObject(), eq(Tasks.id(task)))).andAnswer(() -> {
       // Test a corner case where a task is deleted while it is being evaluated by the task
       // scheduler.  If not handled carefully, this could result in the scheduler trying again
       // later to satisfy the deleted task.
@@ -91,6 +103,7 @@ public class TaskGroupsTest extends EasyMockTest {
 
       return false;
     });
+    expectBatchExecute(batchWorker, storageUtil.storage, control, false).anyTimes();
     expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY.as(Time.MILLISECONDS)))
         .andReturn(0L);
 
@@ -101,10 +114,11 @@ public class TaskGroupsTest extends EasyMockTest {
   }
 
   @Test
-  public void testEvaluatedOnStartup() {
+  public void testEvaluatedOnStartup() throws Exception {
     expect(rateLimiter.acquire()).andReturn(0D);
     expect(rescheduleCalculator.getStartupScheduleDelayMs(makeTask(TASK_A_ID))).andReturn(1L);
-    expect(taskScheduler.schedule(TASK_A_ID)).andReturn(true);
+    expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
+    expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
 
     control.replay();
 
@@ -114,10 +128,11 @@ public class TaskGroupsTest extends EasyMockTest {
   }
 
   @Test
-  public void testResistStarvation() {
+  public void testResistStarvation() throws Exception {
     expect(rateLimiter.acquire()).andReturn(0D).times(2);
-    expect(taskScheduler.schedule("a0")).andReturn(true);
-    expect(taskScheduler.schedule("b0")).andReturn(true);
+    expect(taskScheduler.schedule(anyObject(), eq("a0"))).andReturn(true);
+    expect(taskScheduler.schedule(anyObject(), eq("b0"))).andReturn(true);
+    expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index 72562e6..a4e87d2 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -157,7 +157,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertTrue(scheduler.schedule("a"));
+    assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -169,7 +169,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertTrue(scheduler.schedule("a"));
+    assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -201,9 +201,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(scheduler.schedule("a"));
-    assertFalse(scheduler.schedule("a"));
-    assertTrue(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -218,7 +218,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -233,7 +233,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -281,7 +281,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertTrue(scheduler.schedule(Tasks.id(taskA)));
+    memStorage.write((NoResult.Quiet)
+        store -> assertTrue(scheduler.schedule(store, Tasks.id(taskA))));
   }
 
   @Test
@@ -295,7 +296,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {