You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/08/05 21:40:13 UTC

aurora git commit: Integrating DelayExecutor into the scheduler's transaction handling.

Repository: aurora
Updated Branches:
  refs/heads/master ae6e8575b -> 61c63ea9e


Integrating DelayExecutor into the scheduler's transaction handling.

Bugs closed: AURORA-1395

Reviewed at https://reviews.apache.org/r/37049/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/61c63ea9
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/61c63ea9
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/61c63ea9

Branch: refs/heads/master
Commit: 61c63ea9e79675bd415fbae0cd5d16f51ebd63f2
Parents: ae6e857
Author: Bill Farner <wf...@apache.org>
Authored: Wed Aug 5 12:39:52 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Aug 5 12:39:52 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  | 40 ++++-------
 .../aurora/benchmark/ThriftApiBenchmarks.java   |  2 +
 .../aurora/scheduler/async/AsyncModule.java     | 76 ++++++++++++--------
 .../scheduler/async/GatedDelayExecutor.java     |  7 ++
 .../aurora/scheduler/offers/OfferManager.java   | 12 ++--
 .../scheduler/pruning/TaskHistoryPruner.java    | 14 ++--
 .../scheduler/reconciliation/KillRetry.java     | 11 +--
 .../reconciliation/ReconciliationModule.java    | 20 ++++++
 .../reconciliation/TaskReconciler.java          |  4 +-
 .../scheduler/reconciliation/TaskTimeout.java   | 18 +++--
 .../aurora/scheduler/scheduling/TaskGroups.java | 64 +++--------------
 .../scheduler/scheduling/TaskThrottler.java     | 15 ++--
 .../aurora/scheduler/storage/db/DbModule.java   | 29 +++++---
 .../aurora/scheduler/storage/db/DbStorage.java  | 23 +++++-
 .../aurora/scheduler/storage/db/DbUtil.java     |  2 +
 .../aurora/scheduler/async/AsyncModuleTest.java |  6 +-
 .../scheduler/http/JettyServerModuleTest.java   |  5 ++
 .../http/api/security/HttpSecurityIT.java       |  7 --
 .../scheduler/offers/OfferManagerImplTest.java  |  6 +-
 .../pruning/TaskHistoryPrunerTest.java          | 62 +++++++++-------
 .../scheduler/reconciliation/KillRetryTest.java | 17 ++---
 .../reconciliation/TaskTimeoutTest.java         | 16 ++---
 .../scheduler/scheduling/TaskGroupsTest.java    | 12 ++--
 .../scheduler/scheduling/TaskThrottlerTest.java | 14 ++--
 .../scheduler/storage/db/DbStorageTest.java     |  6 ++
 .../testing/FakeScheduledExecutor.java          | 35 ++++++++-
 26 files changed, 286 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index d75f090..e41b299 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -14,14 +14,11 @@
 package org.apache.aurora.benchmark;
 
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.inject.Singleton;
 
 import com.google.common.eventbus.EventBus;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -39,6 +36,7 @@ import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.async.AsyncModule;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -90,7 +88,6 @@ public class SchedulingBenchmarks {
     private static final Amount<Long, Time> DELAY_FOREVER = Amount.of(30L, Time.DAYS);
     protected Storage storage;
     protected PendingTaskProcessor pendingTaskProcessor;
-    protected ScheduledThreadPoolExecutor executor;
     private TaskScheduler taskScheduler;
     private OfferManager offerManager;
     private EventBus eventBus;
@@ -105,11 +102,6 @@ public class SchedulingBenchmarks {
       eventBus = new EventBus();
       final FakeClock clock = new FakeClock();
       clock.setNowMillis(System.currentTimeMillis());
-      executor = new ScheduledThreadPoolExecutor(
-          1,
-          new ThreadFactoryBuilder()
-              .setDaemon(true)
-              .setNameFormat("TestProcessor-%d").build());
 
       // TODO(maxim): Find a way to DRY it and reuse existing modules instead.
       Injector injector = Guice.createInjector(
@@ -118,18 +110,23 @@ public class SchedulingBenchmarks {
           new PrivateModule() {
             @Override
             protected void configure() {
-              bind(ScheduledExecutorService.class)
-                  .annotatedWith(AsyncModule.AsyncExecutor.class)
-                  .toInstance(executor);
-              bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
-              bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
-              bind(OfferManager.OfferReturnDelay.class).toInstance(
-                  new OfferManager.OfferReturnDelay() {
+              // We use a no-op executor for async work, as this benchmark is focused on the
+              // synchronous scheduling operations.
+              bind(DelayExecutor.class).annotatedWith(AsyncModule.AsyncExecutor.class)
+                  .toInstance(new DelayExecutor() {
+                    @Override
+                    public void execute(Runnable work, Amount<Long, Time> minDelay) {
+                      // No-op.
+                    }
+
                     @Override
-                    public Amount<Long, Time> get() {
-                      return DELAY_FOREVER;
+                    public void execute(Runnable command) {
+                      // No-op.
                     }
                   });
+              bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
+              bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
+              bind(OfferManager.OfferReturnDelay.class).toInstance(() -> DELAY_FOREVER);
               bind(BiCache.BiCacheSettings.class).toInstance(
                   new BiCache.BiCacheSettings(DELAY_FOREVER, ""));
               bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
@@ -183,13 +180,6 @@ public class SchedulingBenchmarks {
       saveTasks(settings.getTasks());
     }
 
-    @Setup(Level.Iteration)
-    public void setUpIteration() {
-      // Clear executor queue between iterations. Otherwise, executor tasks keep piling up and
-      // affect benchmark performance due to memory pressure and excessive GC.
-      executor.getQueue().clear();
-    }
-
     private Set<IScheduledTask> buildClusterTasks(int numOffers) {
       int numOffersToFill = (int) Math.round(numOffers * settings.getClusterUtilization());
       return new Tasks.Builder()

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
index b2a3e9b..4ddfea2 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
@@ -32,6 +32,7 @@ import org.apache.aurora.gen.ReadOnlyScheduler;
 import org.apache.aurora.gen.Response;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.TaskQuery;
+import org.apache.aurora.scheduler.async.AsyncModule;
 import org.apache.aurora.scheduler.cron.CronPredictor;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.state.LockManager;
@@ -151,6 +152,7 @@ public class ThriftApiBenchmarks {
             bind(StatsProvider.class).toInstance(new FakeStatsProvider());
           }
         },
+        new AsyncModule(),
         DbModule.productionModule(Bindings.KeyFactory.PLAIN),
         new ThriftModule.ReadOnly());
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index c345c92..8416ea0 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -15,18 +15,18 @@ package org.apache.aurora.scheduler.async;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 import javax.inject.Qualifier;
+import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.stats.StatsProvider;
@@ -48,61 +48,77 @@ public class AsyncModule extends AbstractModule {
 
   @CmdLine(name = "async_worker_threads",
       help = "The number of worker threads to process async task operations with.")
-  private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(1);
+  private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(8);
+  private final ScheduledThreadPoolExecutor afterTransaction;
 
   @Qualifier
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
   public @interface AsyncExecutor { }
 
-  @VisibleForTesting
-  static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size";
+  public AsyncModule() {
+    // Don't worry about clean shutdown, these can be daemon and cleanup-free.
+    // TODO(wfarner): Should we use a bounded caching thread pool executor instead?
+    this(AsyncUtil.loggingScheduledExecutor(ASYNC_WORKER_THREADS.get(), "AsyncProcessor-%d", LOG));
+  }
 
   @VisibleForTesting
-  static final String ASYNC_TASKS_GAUGE = "async_tasks_completed";
+  public AsyncModule(ScheduledThreadPoolExecutor executor) {
+    this.afterTransaction = requireNonNull(executor);
+  }
 
   @Override
   protected void configure() {
-    // Don't worry about clean shutdown, these can be daemon and cleanup-free.
-    final ScheduledThreadPoolExecutor executor =
-        AsyncUtil.loggingScheduledExecutor(ASYNC_WORKER_THREADS.get(), "AsyncProcessor-%d", LOG);
-    bind(ScheduledThreadPoolExecutor.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
-    bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
-    bind(ExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(ScheduledThreadPoolExecutor.class).toInstance(afterTransaction);
+        bind(ScheduledExecutorService.class).toInstance(afterTransaction);
+
+        bind(GatedDelayExecutor.class).in(Singleton.class);
+        expose(GatedDelayExecutor.class);
+
+        bind(RegisterGauges.class).in(Singleton.class);
+        expose(RegisterGauges.class);
+      }
+    });
     SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class);
+
+    bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
+    bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
   }
 
   static class RegisterGauges extends AbstractIdleService {
+    @VisibleForTesting
+    static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size";
+
+    @VisibleForTesting
+    static final String ASYNC_TASKS_GAUGE = "async_tasks_completed";
+
+    @VisibleForTesting
+    static final String DELAY_QUEUE_GAUGE = "delay_executor_queue_size";
+
     private final StatsProvider statsProvider;
     private final ScheduledThreadPoolExecutor executor;
+    private final GatedDelayExecutor delayExecutor;
 
     @Inject
     RegisterGauges(
         StatsProvider statsProvider,
-        @AsyncExecutor ScheduledThreadPoolExecutor executor) {
+        ScheduledThreadPoolExecutor executor,
+        GatedDelayExecutor delayExecutor) {
 
       this.statsProvider = requireNonNull(statsProvider);
       this.executor = requireNonNull(executor);
+      this.delayExecutor = requireNonNull(delayExecutor);
     }
 
     @Override
     protected void startUp() {
-      statsProvider.makeGauge(
-          TIMEOUT_QUEUE_GAUGE,
-          new Supplier<Integer>() {
-            @Override
-            public Integer get() {
-              return executor.getQueue().size();
-            }
-          });
-      statsProvider.makeGauge(
-          ASYNC_TASKS_GAUGE,
-          new Supplier<Long>() {
-            @Override
-            public Long get() {
-              return executor.getCompletedTaskCount();
-            }
-          }
-      );
+      statsProvider.makeGauge(TIMEOUT_QUEUE_GAUGE, () -> executor.getQueue().size());
+      statsProvider.makeGauge(ASYNC_TASKS_GAUGE, executor::getCompletedTaskCount);
+      // Using a lambda rather than method ref to sidestep a bug in PMD that makes it think
+      // delayExecutor is unused.
+      statsProvider.makeGauge(DELAY_QUEUE_GAUGE, () -> delayExecutor.getQueueSize());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
index 1893a9b..2889e79 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
@@ -16,6 +16,8 @@ package org.apache.aurora.scheduler.async;
 import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
 
+import javax.inject.Inject;
+
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.twitter.common.quantity.Amount;
@@ -36,10 +38,15 @@ class GatedDelayExecutor implements DelayExecutor, FlushableWorkQueue {
    *
    * @param delegate Delegate to execute work with when flushed.
    */
+  @Inject
   GatedDelayExecutor(ScheduledExecutorService delegate) {
     this.executor = requireNonNull(delegate);
   }
 
+  synchronized int getQueueSize() {
+    return queue.size();
+  }
+
   private synchronized void enqueue(Runnable work) {
     queue.add(work);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 4b8a55f..d1ebad1 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -17,8 +17,6 @@ import java.util.Comparator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
 
@@ -43,6 +41,7 @@ import com.twitter.common.stats.Stats;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -159,14 +158,14 @@ public interface OfferManager extends EventSubscriber {
 
     private final Driver driver;
     private final OfferReturnDelay returnDelay;
-    private final ScheduledExecutorService executor;
+    private final DelayExecutor executor;
 
     @Inject
     @VisibleForTesting
     public OfferManagerImpl(
         Driver driver,
         OfferReturnDelay returnDelay,
-        @AsyncExecutor ScheduledExecutorService executor) {
+        @AsyncExecutor DelayExecutor executor) {
 
       this.driver = requireNonNull(driver);
       this.returnDelay = requireNonNull(returnDelay);
@@ -190,15 +189,14 @@ public interface OfferManager extends EventSubscriber {
         removeAndDecline(sameSlave.get().getOffer().getId());
       } else {
         hostOffers.add(offer);
-        executor.schedule(
+        executor.execute(
             new Runnable() {
               @Override
               public void run() {
                 removeAndDecline(offer.getOffer().getId());
               }
             },
-            returnDelay.get().as(Time.MILLISECONDS),
-            TimeUnit.MILLISECONDS);
+            returnDelay.get());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index fa9a09c..3cff5bb 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -14,8 +14,6 @@
 package org.apache.aurora.scheduler.pruning;
 
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
@@ -32,6 +30,7 @@ import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -51,7 +50,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 public class TaskHistoryPruner implements EventSubscriber {
   private static final Logger LOG = Logger.getLogger(TaskHistoryPruner.class.getName());
 
-  private final ScheduledExecutorService executor;
+  private final DelayExecutor executor;
   private final StateManager stateManager;
   private final Clock clock;
   private final HistoryPrunnerSettings settings;
@@ -83,7 +82,7 @@ public class TaskHistoryPruner implements EventSubscriber {
 
   @Inject
   TaskHistoryPruner(
-      @AsyncExecutor ScheduledExecutorService executor,
+      @AsyncExecutor DelayExecutor executor,
       StateManager stateManager,
       Clock clock,
       HistoryPrunnerSettings settings,
@@ -142,7 +141,7 @@ public class TaskHistoryPruner implements EventSubscriber {
       long timeRemaining) {
 
     LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
-    executor.schedule(
+    executor.execute(
         new Runnable() {
           @Override
           public void run() {
@@ -150,10 +149,9 @@ public class TaskHistoryPruner implements EventSubscriber {
             deleteTasks(ImmutableSet.of(taskId));
           }
         },
-        timeRemaining,
-        TimeUnit.MILLISECONDS);
+        Amount.of(timeRemaining, Time.MILLISECONDS));
 
-    executor.submit(new Runnable() {
+    executor.execute(new Runnable() {
       @Override
       public void run() {
         Iterable<IScheduledTask> inactiveTasks =

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
index 1611a3b..b422fa1 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.reconciliation;
 
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
 
@@ -23,11 +21,14 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.Subscribe;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.BackoffStrategy;
 
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -48,7 +49,7 @@ public class KillRetry implements EventSubscriber {
 
   private final Driver driver;
   private final Storage storage;
-  private final ScheduledExecutorService executor;
+  private final DelayExecutor executor;
   private final BackoffStrategy backoffStrategy;
   private final AtomicLong killRetries;
 
@@ -56,7 +57,7 @@ public class KillRetry implements EventSubscriber {
   KillRetry(
       Driver driver,
       Storage storage,
-      @AsyncExecutor ScheduledExecutorService executor,
+      @AsyncExecutor DelayExecutor executor,
       BackoffStrategy backoffStrategy,
       StatsProvider statsProvider) {
 
@@ -84,7 +85,7 @@ public class KillRetry implements EventSubscriber {
 
     void tryLater() {
       retryInMs.set(backoffStrategy.calculateBackoffMs(retryInMs.get()));
-      executor.schedule(this, retryInMs.get(), TimeUnit.MILLISECONDS);
+      executor.execute(this, Amount.of(retryInMs.get(), Time.MILLISECONDS));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
index 406c077..2677238 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
@@ -13,6 +13,12 @@
  */
 package org.apache.aurora.scheduler.reconciliation;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
+
+import javax.inject.Qualifier;
 import javax.inject.Singleton;
 
 import com.google.inject.AbstractModule;
@@ -27,14 +33,22 @@ import com.twitter.common.util.BackoffStrategy;
 import com.twitter.common.util.TruncatedBinaryBackoff;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.reconciliation.TaskReconciler.TaskReconcilerSettings;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
 /**
  * Binding module for state reconciliation and retry logic.
  */
 public class ReconciliationModule extends AbstractModule {
 
+  private static final Logger LOG = Logger.getLogger(ReconciliationModule.class.getName());
+
   @CmdLine(name = "transient_task_state_timeout",
       help = "The amount of time after which to treat a task stuck in a transient state as LOST.")
   private static final Arg<Amount<Long, Time>> TRANSIENT_TASK_STATE_TIMEOUT =
@@ -73,6 +87,10 @@ public class ReconciliationModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD =
       Arg.create(Amount.of(30L, Time.MINUTES));
 
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface BackgroundWorker { }
+
   @Override
   protected void configure() {
     install(new PrivateModule() {
@@ -109,6 +127,8 @@ public class ReconciliationModule extends AbstractModule {
             RECONCILIATION_EXPLICIT_INTERVAL.get(),
             RECONCILIATION_IMPLICIT_INTERVAL.get(),
             RECONCILIATION_SCHEDULE_SPREAD.get()));
+        bind(ScheduledExecutorService.class).annotatedWith(BackgroundWorker.class)
+            .toInstance(AsyncUtil.loggingScheduledExecutor(1, "TaskReconciler-%d", LOG));
         bind(TaskReconciler.class).in(Singleton.class);
         expose(TaskReconciler.class);
       }

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
index 653e52b..8f866a8 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
@@ -27,10 +27,10 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.StatsProvider;
 
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.reconciliation.ReconciliationModule.BackgroundWorker;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos;
@@ -91,7 +91,7 @@ public class TaskReconciler extends AbstractIdleService {
       TaskReconcilerSettings settings,
       Storage storage,
       Driver driver,
-      @AsyncExecutor ScheduledExecutorService executor,
+      @BackgroundWorker ScheduledExecutorService executor,
       StatsProvider stats) {
 
     this.settings = requireNonNull(settings);

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
index fb83972..72a46f0 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.reconciliation;
 
 import java.util.EnumSet;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
 
@@ -31,6 +30,7 @@ import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateChangeResult;
@@ -65,7 +65,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
       ScheduleStatus.KILLING,
       ScheduleStatus.DRAINING);
 
-  private final ScheduledExecutorService executor;
+  private final DelayExecutor executor;
   private final Storage storage;
   private final StateManager stateManager;
   private final Amount<Long, Time> timeout;
@@ -73,7 +73,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
 
   @Inject
   TaskTimeout(
-      @AsyncExecutor ScheduledExecutorService executor,
+      @AsyncExecutor DelayExecutor executor,
       Storage storage,
       StateManager stateManager,
       Amount<Long, Time> timeout,
@@ -138,10 +138,9 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
         // Our service is not yet started.  We don't want to lose track of the task, so
         // we will try again later.
         LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
-        executor.schedule(
-            this,
-            NOT_STARTED_RETRY.getValue(),
-            NOT_STARTED_RETRY.getUnit().getTimeUnit());
+        // TODO(wfarner): This execution should not wait for a transaction, but a second executor
+        // would be weird.
+        executor.execute(this, NOT_STARTED_RETRY);
       }
     }
   }
@@ -149,10 +148,9 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
   @Subscribe
   public void recordStateChange(TaskStateChange change) {
     if (isTransient(change.getNewState())) {
-      executor.schedule(
+      executor.execute(
           new TimedOutTaskHandler(change.getTaskId(), change.getNewState()),
-          timeout.getValue(),
-          timeout.getUnit().getTimeUnit());
+          timeout);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index e60daad..eaf784e 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -14,13 +14,9 @@
 package org.apache.aurora.scheduler.scheduling;
 
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
@@ -28,16 +24,13 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Command;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.SlidingStats;
-import com.twitter.common.stats.Stats;
 import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
-import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -47,7 +40,6 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 
@@ -63,10 +55,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
  */
 public class TaskGroups implements EventSubscriber {
 
-  private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
   private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap();
-  private final ScheduledExecutorService executor;
+  private final DelayExecutor executor;
   private final TaskScheduler taskScheduler;
   private final long firstScheduleDelay;
   private final BackoffStrategy backoff;
@@ -95,43 +85,25 @@ public class TaskGroups implements EventSubscriber {
 
   @Inject
   TaskGroups(
-      ShutdownRegistry shutdownRegistry,
+      @AsyncExecutor DelayExecutor executor,
       TaskGroupsSettings settings,
       TaskScheduler taskScheduler,
       RescheduleCalculator rescheduleCalculator) {
 
-    this(
-        createThreadPool(shutdownRegistry),
-        settings.firstScheduleDelay,
-        settings.taskGroupBackoff,
-        settings.rateLimiter,
-        taskScheduler,
-        rescheduleCalculator);
-  }
-
-  @VisibleForTesting
-  TaskGroups(
-      final ScheduledExecutorService executor,
-      final Amount<Long, Time> firstScheduleDelay,
-      final BackoffStrategy backoff,
-      final RateLimiter rateLimiter,
-      final TaskScheduler taskScheduler,
-      final RescheduleCalculator rescheduleCalculator) {
-
-    requireNonNull(firstScheduleDelay);
-    Preconditions.checkArgument(firstScheduleDelay.getValue() > 0);
+    requireNonNull(settings.firstScheduleDelay);
+    Preconditions.checkArgument(settings.firstScheduleDelay.getValue() > 0);
 
     this.executor = requireNonNull(executor);
-    requireNonNull(rateLimiter);
+    requireNonNull(settings.rateLimiter);
     requireNonNull(taskScheduler);
-    this.firstScheduleDelay = firstScheduleDelay.as(Time.MILLISECONDS);
-    this.backoff = requireNonNull(backoff);
+    this.firstScheduleDelay = settings.firstScheduleDelay.as(Time.MILLISECONDS);
+    this.backoff = requireNonNull(settings.taskGroupBackoff);
     this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
 
     this.taskScheduler = new TaskScheduler() {
       @Override
       public boolean schedule(String taskId) {
-        rateLimiter.acquire();
+        settings.rateLimiter.acquire();
         return taskScheduler.schedule(taskId);
       }
     };
@@ -141,7 +113,7 @@ public class TaskGroups implements EventSubscriber {
     // Avoid check-then-act by holding the intrinsic lock.  If not done atomically, we could
     // remove a group while a task is being added to it.
     if (group.hasMore()) {
-      executor.schedule(evaluate, group.getPenaltyMs(), MILLISECONDS);
+      executor.execute(evaluate, Amount.of(group.getPenaltyMs(), Time.MILLISECONDS));
     } else {
       groups.remove(group.getKey());
     }
@@ -172,20 +144,6 @@ public class TaskGroups implements EventSubscriber {
     evaluateGroupLater(monitor, group);
   }
 
-  private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
-    final ScheduledThreadPoolExecutor executor =
-        AsyncUtil.singleThreadLoggingScheduledExecutor("TaskScheduler-%d", LOG);
-
-    Stats.exportSize("schedule_queue_size", executor.getQueue());
-    shutdownRegistry.addAction(new Command() {
-      @Override
-      public void execute() {
-        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
-      }
-    });
-    return executor;
-  }
-
   /**
    * Informs the task groups of a task state change.
    * <p>

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
index e54e6c4..fdc5bd7 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
@@ -13,17 +13,17 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import javax.inject.Inject;
 
 import com.google.common.base.Optional;
 import com.google.common.eventbus.Subscribe;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.SlidingStats;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -45,7 +45,7 @@ class TaskThrottler implements EventSubscriber {
 
   private final RescheduleCalculator rescheduleCalculator;
   private final Clock clock;
-  private final ScheduledExecutorService executor;
+  private final DelayExecutor executor;
   private final Storage storage;
   private final StateManager stateManager;
 
@@ -55,7 +55,7 @@ class TaskThrottler implements EventSubscriber {
   TaskThrottler(
       RescheduleCalculator rescheduleCalculator,
       Clock clock,
-      @AsyncExecutor ScheduledExecutorService executor,
+      @AsyncExecutor DelayExecutor executor,
       Storage storage,
       StateManager stateManager) {
 
@@ -73,7 +73,7 @@ class TaskThrottler implements EventSubscriber {
           + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
       long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
       throttleStats.accumulate(delayMs);
-      executor.schedule(
+      executor.execute(
           new Runnable() {
             @Override
             public void run() {
@@ -90,8 +90,7 @@ class TaskThrottler implements EventSubscriber {
               });
             }
           },
-          delayMs,
-          TimeUnit.MILLISECONDS);
+          Amount.of(delayMs, Time.MILLISECONDS));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
index ed92661..f0620b9 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
@@ -30,6 +30,7 @@ import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
+import com.google.inject.util.Modules;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.inject.Bindings.KeyFactory;
@@ -37,6 +38,8 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.FlushableWorkQueue;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.CronJobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -149,15 +152,23 @@ public final class DbModule extends PrivateModule {
    */
   @VisibleForTesting
   public static Module testModule(KeyFactory keyFactory, Optional<Module> taskStoreModule) {
-    return new DbModule(
-        keyFactory,
-        taskStoreModule.isPresent() ? taskStoreModule.get() : getTaskStoreModule(keyFactory),
-        "testdb-" + UUID.randomUUID().toString(),
-        // A non-zero close delay is used here to avoid eager database cleanup in tests that
-        // make use of multiple threads.  Since all test databases are separately scoped by the
-        // included UUID, multiple DB instances will overlap in time but they should be distinct
-        // in content.
-        ImmutableMap.of("DB_CLOSE_DELAY", "5"));
+    return Modules.combine(
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance(() -> { });
+          }
+        },
+        new DbModule(
+            keyFactory,
+            taskStoreModule.isPresent() ? taskStoreModule.get() : getTaskStoreModule(keyFactory),
+            "testdb-" + UUID.randomUUID().toString(),
+            // A non-zero close delay is used here to avoid eager database cleanup in tests that
+            // make use of multiple threads.  Since all test databases are separately scoped by the
+            // included UUID, multiple DB instances will overlap in time but they should be distinct
+            // in content.
+            ImmutableMap.of("DB_CLOSE_DELAY", "5"))
+    );
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index a1f0d3c..aac62e2 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -28,6 +28,8 @@ import org.apache.aurora.gen.JobUpdateAction;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.FlushableWorkQueue;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.CronJobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -59,11 +61,13 @@ class DbStorage extends AbstractIdleService implements Storage {
   private final SqlSessionFactory sessionFactory;
   private final MutableStoreProvider storeProvider;
   private final EnumValueMapper enumValueMapper;
+  private final FlushableWorkQueue postTransactionWork;
 
   @Inject
   DbStorage(
       SqlSessionFactory sessionFactory,
       EnumValueMapper enumValueMapper,
+      @AsyncExecutor FlushableWorkQueue postTransactionWork,
       final CronJobStore.Mutable cronJobStore,
       final TaskStore.Mutable taskStore,
       final SchedulerStore.Mutable schedulerStore,
@@ -74,6 +78,7 @@ class DbStorage extends AbstractIdleService implements Storage {
 
     this.sessionFactory = requireNonNull(sessionFactory);
     this.enumValueMapper = requireNonNull(enumValueMapper);
+    this.postTransactionWork = requireNonNull(postTransactionWork);
     requireNonNull(cronJobStore);
     requireNonNull(taskStore);
     requireNonNull(schedulerStore);
@@ -139,11 +144,23 @@ class DbStorage extends AbstractIdleService implements Storage {
   @Override
   @Transactional
   public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
-    try {
-      return work.apply(storeProvider);
+    T result;
+    try (SqlSession session = sessionFactory.openSession(false)) {
+      result = work.apply(storeProvider);
+      session.commit();
     } catch (PersistenceException e) {
       throw new StorageException(e.getMessage(), e);
+    } finally {
+      // NOTE: Async work is intentionally executed regardless of whether the transaction succeeded.
+      // Doing otherwise runs the risk of cross-talk between transactions and losing async tasks
+      // due to failure of an unrelated transaction.  This matches behavior prior to the
+      // introduction of DbStorage, but should be revisited.
+      // TODO(wfarner): Consider revisiting to execute async work only when the transaction is
+      // successful.
+      postTransactionWork.flush();
     }
+
+    return result;
   }
 
   @VisibleForTesting
@@ -169,6 +186,8 @@ class DbStorage extends AbstractIdleService implements Storage {
       } finally {
         session.update(ENABLE_UNDO_LOG);
       }
+    } finally {
+      postTransactionWork.flush();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
index 3a86614..06e7f23 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
@@ -63,6 +63,8 @@ public final class DbUtil {
 
   /**
    * Creates a new, empty test storage system.
+   * <p>
+   * TODO(wfarner): Rename this to createTestStorage() to avoid misuse.
    *
    * @return A new storage instance.
    */

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
index 5384307..9d66685 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
@@ -29,6 +29,7 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.scheduler.AppStartup;
+import org.apache.aurora.scheduler.async.AsyncModule.RegisterGauges;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Before;
@@ -86,7 +87,10 @@ public class AsyncModuleTest extends EasyMockTest {
     injector.getBindings();
 
     assertEquals(
-        ImmutableMap.of(AsyncModule.TIMEOUT_QUEUE_GAUGE, 0, AsyncModule.ASYNC_TASKS_GAUGE, 0L),
+        ImmutableMap.of(
+            RegisterGauges.TIMEOUT_QUEUE_GAUGE, 0,
+            RegisterGauges.ASYNC_TASKS_GAUGE, 0L,
+            RegisterGauges.DELAY_QUEUE_GAUGE, 0),
         statsProvider.getAllValues()
     );
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
index 91b91bc..6a17d3a 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
@@ -41,12 +41,14 @@ import com.twitter.common.net.pool.DynamicHostSet;
 import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.BackoffStrategy;
 import com.twitter.thrift.ServiceInstance;
 
 import org.apache.aurora.gen.ServerInfo;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.async.AsyncModule;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
 import org.apache.aurora.scheduler.offers.OfferManager;
@@ -57,6 +59,7 @@ import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.Capture;
 import org.junit.Before;
 
@@ -96,6 +99,7 @@ public abstract class JettyServerModuleTest extends EasyMockTest {
         new StatsModule(),
         new LifecycleModule(),
         new SchedulerServicesModule(),
+        new AsyncModule(),
         new AbstractModule() {
           <T> T bindMock(Class<T> clazz) {
             T mock = createMock(clazz);
@@ -105,6 +109,7 @@ public abstract class JettyServerModuleTest extends EasyMockTest {
 
           @Override
           protected void configure() {
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
             bind(Storage.class).toInstance(storage.storage);
             bind(IServerInfo.class).toInstance(IServerInfo.build(new ServerInfo()
                 .setClusterName("unittest")

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java b/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java
index e725e10..a5703e5 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.http.api.security;
 
 import java.io.IOException;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
@@ -25,7 +24,6 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Module;
 import com.google.inject.util.Modules;
 import com.sun.jersey.api.client.ClientResponse;
-import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.gen.AuroraAdmin;
 import org.apache.aurora.gen.Lock;
@@ -62,7 +60,6 @@ import org.junit.Test;
 import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PATH;
 import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PERM;
 import static org.apache.aurora.scheduler.http.api.ApiModule.API_PATH;
-import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -98,7 +95,6 @@ public class HttpSecurityIT extends JettyServerModuleTest {
 
   private Ini ini;
   private AnnotatedAuroraAdmin auroraAdmin;
-  private StatsProvider statsProvider;
 
   private static final Joiner COMMA_JOINER = Joiner.on(", ");
   private static final String ADMIN_ROLE = "admin";
@@ -138,8 +134,6 @@ public class HttpSecurityIT extends JettyServerModuleTest {
     roles.put(H2_ROLE, H2_PERM);
 
     auroraAdmin = createMock(AnnotatedAuroraAdmin.class);
-    statsProvider = createMock(StatsProvider.class);
-    expect(statsProvider.makeCounter(anyString())).andStubReturn(new AtomicLong());
   }
 
   @Override
@@ -152,7 +146,6 @@ public class HttpSecurityIT extends JettyServerModuleTest {
           @Override
           protected void configure() {
             MockDecoratedThrift.bindForwardedMock(binder(), auroraAdmin);
-            bind(StatsProvider.class).toInstance(statsProvider);
           }
         });
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 088a4a6..1cc9ec4 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.offers;
 
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 
 import com.google.common.base.Optional;
@@ -29,6 +28,7 @@ import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
@@ -85,8 +85,8 @@ public class OfferManagerImplTest extends EasyMockTest {
       }
     });
     driver = createMock(Driver.class);
-    ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
-    clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
+    DelayExecutor executorMock = createMock(DelayExecutor.class);
+    clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
 
     addTearDown(new TearDown() {
       @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 532b0ea..892861d 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -14,20 +14,22 @@
 package org.apache.aurora.scheduler.pruning;
 
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
 import com.twitter.common.base.Command;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
@@ -39,6 +41,10 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.AsyncModule;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
+import org.apache.aurora.scheduler.async.FlushableWorkQueue;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
@@ -46,6 +52,7 @@ import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
@@ -72,8 +79,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
   private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
   private static final int PER_JOB_HISTORY = 2;
 
-  private ScheduledFuture<?> future;
-  private ScheduledExecutorService executor;
+  private DelayExecutor executor;
   private FakeClock clock;
   private StateManager stateManager;
   private StorageTestUtil storageUtil;
@@ -81,8 +87,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
 
   @Before
   public void setUp() {
-    future = createMock(new Clazz<ScheduledFuture<?>>() { });
-    executor = createMock(ScheduledExecutorService.class);
+    executor = createMock(DelayExecutor.class);
     clock = new FakeClock();
     stateManager = createMock(StateManager.class);
     storageUtil = new StorageTestUtil(this);
@@ -232,14 +237,29 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
     // lock. This causes a deadlock when the executor tries to acquire a lock held by the event
     // fired.
 
-    pruner = prunerWithRealExecutor();
+    ScheduledThreadPoolExecutor realExecutor = new ScheduledThreadPoolExecutor(1,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("testThreadSafeEvents-executor")
+            .build());
+    Injector injector = Guice.createInjector(
+        new AsyncModule(realExecutor),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+          }
+        });
+    executor = injector.getInstance(Key.get(DelayExecutor.class, AsyncExecutor.class));
+    FlushableWorkQueue flusher =
+        injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class));
+
+    pruner = buildPruner(executor);
     Command onDeleted = new Command() {
       @Override
       public void execute() {
         // The goal is to verify that the call does not deadlock. We do not care about the outcome.
-        IScheduledTask b = makeTask("b", ASSIGNED);
-
-        changeState(b, STARTING);
+        changeState(makeTask("b", ASSIGNED), STARTING);
       }
     };
     CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
@@ -248,17 +268,13 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
 
     // Change the task to a terminal state and wait for it to be pruned.
     changeState(makeTask(TASK_ID, RUNNING), KILLED);
+    flusher.flush();
     taskDeleted.await();
   }
 
-  private TaskHistoryPruner prunerWithRealExecutor() {
-    ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1,
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat("testThreadSafeEvents-executor")
-            .build());
+  private TaskHistoryPruner buildPruner(DelayExecutor delayExecutor) {
     return new TaskHistoryPruner(
-        realExecutor,
+        delayExecutor,
         stateManager,
         clock,
         new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
@@ -326,7 +342,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
       IScheduledTask... pruned) {
 
     // Expect a deferred prune operation when a new task is being watched.
-    executor.submit(EasyMock.<Runnable>anyObject());
+    executor.execute(EasyMock.<Runnable>anyObject());
     expectLastCall().andAnswer(
         new IAnswer<Future<?>>() {
           @Override
@@ -348,11 +364,9 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
 
   private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
     Capture<Runnable> capture = createCapture();
-    executor.schedule(
+    executor.execute(
         EasyMock.capture(capture),
-        eq(pruner.calculateTimeout(timestampMillis)),
-        eq(TimeUnit.MILLISECONDS));
-    expectLastCall().andReturn(future).times(count);
+        eq(Amount.of(pruner.calculateTimeout(timestampMillis), Time.MILLISECONDS)));
     return capture;
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
index 26f65fa..957cbd0 100644
--- a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
@@ -14,12 +14,10 @@
 package org.apache.aurora.scheduler.reconciliation;
 
 import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.concurrent.ScheduledExecutorService;
 
 import javax.inject.Singleton;
 
 import com.google.common.eventbus.EventBus;
-import com.google.common.testing.TearDown;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -34,6 +32,7 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
@@ -67,14 +66,9 @@ public class KillRetryTest extends EasyMockTest {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     backoffStrategy = createMock(BackoffStrategy.class);
-    final ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
-    clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
-    addTearDown(new TearDown() {
-      @Override
-      public void tearDown() {
-        clock.assertEmpty();
-      }
-    });
+    final DelayExecutor executorMock = createMock(DelayExecutor.class);
+    clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
+    addTearDown(clock::assertEmpty);
     statsProvider = new FakeStatsProvider();
 
     Injector injector = Guice.createInjector(
@@ -85,8 +79,7 @@ public class KillRetryTest extends EasyMockTest {
           protected void configure() {
             bind(Driver.class).toInstance(driver);
             bind(Storage.class).toInstance(storageUtil.storage);
-            bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class)
-                .toInstance(executorMock);
+            bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).toInstance(executorMock);
             PubsubEventModule.bindSubscriber(binder(), KillRetry.class);
             bind(KillRetry.class).in(Singleton.class);
             bind(BackoffStrategy.class).toInstance(backoffStrategy);

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
index 97d25f9..2bcda70 100644
--- a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.reconciliation;
 
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
@@ -30,6 +28,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -52,7 +51,6 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 
 public class TaskTimeoutTest extends EasyMockTest {
@@ -61,9 +59,8 @@ public class TaskTimeoutTest extends EasyMockTest {
   private static final Amount<Long, Time> TIMEOUT = Amount.of(1L, Time.MINUTES);
 
   private AtomicLong timedOutTaskCounter;
-  private ScheduledExecutorService executor;
+  private DelayExecutor executor;
   private StorageTestUtil storageUtil;
-  private ScheduledFuture<?> future;
   private StateManager stateManager;
   private FakeClock clock;
   private TaskTimeout timeout;
@@ -71,10 +68,9 @@ public class TaskTimeoutTest extends EasyMockTest {
 
   @Before
   public void setUp() {
-    executor = createMock(ScheduledExecutorService.class);
+    executor = createMock(DelayExecutor.class);
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
-    future = createMock(new Clazz<ScheduledFuture<?>>() { });
     stateManager = createMock(StateManager.class);
     clock = new FakeClock();
     statsProvider = createMock(StatsProvider.class);
@@ -96,11 +92,7 @@ public class TaskTimeoutTest extends EasyMockTest {
 
   private Capture<Runnable> expectTaskWatch(Amount<Long, Time> expireIn) {
     Capture<Runnable> capture = createCapture();
-    executor.schedule(
-        EasyMock.capture(capture),
-        eq((long) expireIn.getValue()),
-        eq(expireIn.getUnit().getTimeUnit()));
-    expectLastCall().andReturn(future);
+    executor.execute(EasyMock.capture(capture), eq(expireIn));
     return capture;
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
index 55aad35..6bcfefd 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
-import java.util.concurrent.ScheduledExecutorService;
-
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.RateLimiter;
 import com.twitter.common.quantity.Amount;
@@ -27,9 +25,11 @@ import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
@@ -56,17 +56,15 @@ public class TaskGroupsTest extends EasyMockTest {
 
   @Before
   public void setUp() throws Exception {
-    ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
-    clock = FakeScheduledExecutor.scheduleExecutor(executor);
+    DelayExecutor executor = createMock(DelayExecutor.class);
+    clock = FakeScheduledExecutor.fromDelayExecutor(executor);
     backoffStrategy = createMock(BackoffStrategy.class);
     taskScheduler = createMock(TaskScheduler.class);
     rateLimiter = createMock(RateLimiter.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
     taskGroups = new TaskGroups(
         executor,
-        FIRST_SCHEDULE_DELAY,
-        backoffStrategy,
-        rateLimiter,
+        new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter),
         taskScheduler,
         rescheduleCalculator);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
index 4055021..ba08fe5 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
@@ -13,9 +13,6 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.twitter.common.quantity.Amount;
@@ -27,6 +24,7 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateChangeResult;
@@ -49,7 +47,7 @@ public class TaskThrottlerTest extends EasyMockTest {
 
   private RescheduleCalculator rescheduleCalculator;
   private FakeClock clock;
-  private ScheduledExecutorService executor;
+  private DelayExecutor executor;
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private TaskThrottler throttler;
@@ -58,7 +56,7 @@ public class TaskThrottlerTest extends EasyMockTest {
   public void setUp() throws Exception {
     rescheduleCalculator = createMock(RescheduleCalculator.class);
     clock = new FakeClock();
-    executor = createMock(ScheduledExecutorService.class);
+    executor = createMock(DelayExecutor.class);
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
@@ -116,11 +114,9 @@ public class TaskThrottlerTest extends EasyMockTest {
 
   private Capture<Runnable> expectThrottled(long penaltyMs) {
     Capture<Runnable> stateChangeCapture = createCapture();
-    expect(executor.schedule(
+    executor.execute(
         capture(stateChangeCapture),
-        eq(penaltyMs),
-        eq(TimeUnit.MILLISECONDS)))
-        .andReturn(null);
+        eq(Amount.of(penaltyMs, Time.MILLISECONDS)));
     return stateChangeCapture;
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
index a4bcdd7..3b05db9 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.storage.db;
 
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.scheduler.async.FlushableWorkQueue;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.CronJobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -40,6 +41,7 @@ public class DbStorageTest extends EasyMockTest {
   private SqlSessionFactory sessionFactory;
   private SqlSession session;
   private EnumValueMapper enumMapper;
+  private FlushableWorkQueue flusher;
   private Work.Quiet<String> readWork;
   private MutateWork.NoResult.Quiet writeWork;
 
@@ -50,12 +52,14 @@ public class DbStorageTest extends EasyMockTest {
     sessionFactory = createMock(SqlSessionFactory.class);
     session = createMock(SqlSession.class);
     enumMapper = createMock(EnumValueMapper.class);
+    flusher = createMock(FlushableWorkQueue.class);
     readWork = createMock(new Clazz<Work.Quiet<String>>() { });
     writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { });
 
     storage = new DbStorage(
         sessionFactory,
         enumMapper,
+        flusher,
         createMock(CronJobStore.Mutable.class),
         createMock(TaskStore.Mutable.class),
         createMock(SchedulerStore.Mutable.class),
@@ -89,6 +93,7 @@ public class DbStorageTest extends EasyMockTest {
     expect(sessionFactory.openSession(false)).andReturn(session);
     expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException());
     expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
+    flusher.flush();
 
     control.replay();
 
@@ -102,6 +107,7 @@ public class DbStorageTest extends EasyMockTest {
     expect(writeWork.apply(EasyMock.anyObject())).andReturn(null);
     session.close();
     expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
+    flusher.flush();
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
index e198c1c..48978ec 100644
--- a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
+++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
@@ -27,6 +27,7 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.testing.FakeClock;
 
+import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 
@@ -43,6 +44,7 @@ public final class FakeScheduledExecutor extends FakeClock {
 
   private FakeScheduledExecutor() { }
 
+  // TODO(wfarner): Rename to fromScheduledExecutor().
   public static FakeScheduledExecutor scheduleExecutor(ScheduledExecutorService mock) {
     FakeScheduledExecutor executor = new FakeScheduledExecutor();
     mock.schedule(
@@ -57,6 +59,33 @@ public final class FakeScheduledExecutor extends FakeClock {
     return executor;
   }
 
+  private static IAnswer<Object> answerExecuteWithDelay(final FakeScheduledExecutor executor) {
+    return new IAnswer<Object>() {
+      @Override
+      public Object answer() {
+        Object[] args = EasyMock.getCurrentArguments();
+        Runnable work = (Runnable) args[0];
+        @SuppressWarnings("unchecked")
+        Amount<Long, Time> delay = (Amount<Long, Time>) args[1];
+        addDelayedWork(executor, delay.as(Time.MILLISECONDS), work);
+        return null;
+      }
+    };
+  }
+
+  public static FakeScheduledExecutor fromDelayExecutor(DelayExecutor mock) {
+    FakeScheduledExecutor executor = new FakeScheduledExecutor();
+    mock.execute(
+        EasyMock.<Runnable>anyObject(),
+        EasyMock.<Amount<Long, Time>>anyObject());
+    expectLastCall().andAnswer(answerExecuteWithDelay(executor)).anyTimes();
+
+    mock.execute(EasyMock.anyObject());
+    expectLastCall().andAnswer(answerExecute()).anyTimes();
+
+    return executor;
+  }
+
   private static IAnswer<Void> answerExecute() {
     return new IAnswer<Void>() {
       @Override
@@ -69,10 +98,10 @@ public final class FakeScheduledExecutor extends FakeClock {
     };
   }
 
-  private static IAnswer<ScheduledFuture<?>> answerSchedule(final FakeScheduledExecutor executor) {
-    return new IAnswer<ScheduledFuture<?>>() {
+  private static IAnswer<Object> answerSchedule(final FakeScheduledExecutor executor) {
+    return new IAnswer<Object>() {
       @Override
-      public ScheduledFuture<?> answer() {
+      public Object answer() {
         Object[] args = EasyMock.getCurrentArguments();
         Runnable work = (Runnable) args[0];
         long value = (Long) args[1];