You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/09/15 03:29:26 UTC

aurora git commit: Make async work queue gating thread-local.

Repository: aurora
Updated Branches:
  refs/heads/master 5dccf92f4 -> e658a8a0b


Make async work queue gating thread-local.

Bugs closed: AURORA-1459

Reviewed at https://reviews.apache.org/r/38336/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/e658a8a0
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/e658a8a0
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/e658a8a0

Branch: refs/heads/master
Commit: e658a8a0b71e894a03ffc81dcae981db9d68bcb4
Parents: 5dccf92
Author: Bill Farner <wf...@apache.org>
Authored: Mon Sep 14 18:29:22 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Sep 14 18:29:22 2015 -0700

----------------------------------------------------------------------
 config/findbugs/excludeFilter.xml               |   2 +-
 .../aurora/scheduler/async/AsyncModule.java     |  14 +-
 .../scheduler/async/FlushableWorkQueue.java     |  25 ---
 .../scheduler/async/GatedDelayExecutor.java     |  71 --------
 .../aurora/scheduler/async/GatedWorkQueue.java  |  41 +++++
 .../scheduler/async/GatingDelayExecutor.java    |  98 +++++++++++
 .../aurora/scheduler/storage/db/DbModule.java   |  12 +-
 .../aurora/scheduler/storage/db/DbStorage.java  |  82 ++++-----
 .../aurora/scheduler/app/SchedulerIT.java       |   4 -
 .../scheduler/async/GatedDelayExecutorTest.java |  91 ----------
 .../async/GatingDelayExecutorTest.java          | 170 +++++++++++++++++++
 .../pruning/TaskHistoryPrunerTest.java          |  33 ++--
 .../scheduler/storage/db/DbStorageTest.java     |  54 +++---
 13 files changed, 411 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/config/findbugs/excludeFilter.xml
----------------------------------------------------------------------
diff --git a/config/findbugs/excludeFilter.xml b/config/findbugs/excludeFilter.xml
index 7c65302..fe3f4ca 100644
--- a/config/findbugs/excludeFilter.xml
+++ b/config/findbugs/excludeFilter.xml
@@ -114,7 +114,7 @@ limitations under the License.
   <Match>
     <!-- False positives on a check introduced in findbugs 3.0.1 -->
     <Or>
-      <Class name="org.apache.aurora.scheduler.storage.db.DbStorage" />
+      <Class name="org.apache.aurora.scheduler.storage.db.DbStorage$3" />
       <Class name="org.apache.aurora.scheduler.http.api.security.AuthorizeHeaderTokenTest" />
     </Or>
     <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT" />

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 217b9c0..eccb864 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -75,8 +75,8 @@ public class AsyncModule extends AbstractModule {
         bind(ScheduledThreadPoolExecutor.class).toInstance(afterTransaction);
         bind(ScheduledExecutorService.class).toInstance(afterTransaction);
 
-        bind(GatedDelayExecutor.class).in(Singleton.class);
-        expose(GatedDelayExecutor.class);
+        bind(GatingDelayExecutor.class).in(Singleton.class);
+        expose(GatingDelayExecutor.class);
 
         bind(RegisterGauges.class).in(Singleton.class);
         expose(RegisterGauges.class);
@@ -84,9 +84,9 @@ public class AsyncModule extends AbstractModule {
     });
     SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class);
 
-    bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
-    bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
-    bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
+    bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class);
+    bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class);
+    bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class);
   }
 
   static class RegisterGauges extends AbstractIdleService {
@@ -101,13 +101,13 @@ public class AsyncModule extends AbstractModule {
 
     private final StatsProvider statsProvider;
     private final ScheduledThreadPoolExecutor executor;
-    private final GatedDelayExecutor delayExecutor;
+    private final GatingDelayExecutor delayExecutor;
 
     @Inject
     RegisterGauges(
         StatsProvider statsProvider,
         ScheduledThreadPoolExecutor executor,
-        GatedDelayExecutor delayExecutor) {
+        GatingDelayExecutor delayExecutor) {
 
       this.statsProvider = requireNonNull(statsProvider);
       this.executor = requireNonNull(executor);

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java b/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java
deleted file mode 100644
index 11a1c2a..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-/**
- * A work queue that only executes pending work when flushed.
- */
-public interface FlushableWorkQueue {
-
-  /**
-   * Makes pending work available for execution.
-   */
-  void flush();
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
deleted file mode 100644
index 9d4cfcf..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Queue;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * An executor that queues work until flushed.
- */
-class GatedDelayExecutor implements DelayExecutor, FlushableWorkQueue {
-
-  private final ScheduledExecutorService executor;
-  private final Queue<Runnable> queue = Lists.newLinkedList();
-
-  /**
-   * Creates a gated delay executor that will flush work to the provided {@code delegate}.
-   *
-   * @param delegate Delegate to execute work with when flushed.
-   */
-  @Inject
-  GatedDelayExecutor(ScheduledExecutorService delegate) {
-    this.executor = requireNonNull(delegate);
-  }
-
-  synchronized int getQueueSize() {
-    return queue.size();
-  }
-
-  private synchronized void enqueue(Runnable work) {
-    queue.add(work);
-  }
-
-  @Override
-  public synchronized void flush() {
-    for (Runnable work : Iterables.consumingIterable(queue)) {
-      work.run();
-    }
-  }
-
-  @Override
-  public synchronized void execute(Runnable command) {
-    enqueue(() -> executor.execute(command));
-  }
-
-  @Override
-  public synchronized void execute(Runnable work, Amount<Long, Time> minDelay) {
-    enqueue(() -> executor.schedule(work, minDelay.getValue(), minDelay.getUnit().getTimeUnit()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java b/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java
new file mode 100644
index 0000000..7032271
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+/**
+ * A work queue that only executes pending work when flushed.
+ */
+public interface GatedWorkQueue {
+
+  /**
+   * Closes the gate on the work queue for the duration of an operation.
+   *
+   * @param operation Operation to execute while keeping the gate closed.
+   * @param <T> Operation return type.
+   * @param <E> Operation exception type.
+   * @return The value returned by the {@code operation}.
+   * @throws E Exception thrown by the {@code operation}.
+   */
+  <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E;
+
+  /**
+   * Operation prevents new items from being executed on the work queue.
+   *
+   * @param <T> Operation return type.
+   * @param <E> Operation exception type.
+   */
+  interface GatedOperation<T, E extends Exception> {
+    T doWithGateClosed() throws E;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java
new file mode 100644
index 0000000..a7240ae
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.inject.Inject;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An executor that may be temporarily gated with {@link #closeDuring(GatedOperation)}.  When the
+ * executor is gated, newly-submitted work will be enqueued and executed once the gate is opened as
+ * a result of {@link #closeDuring(GatedOperation)} returning.
+ */
+class GatingDelayExecutor implements DelayExecutor, GatedWorkQueue {
+
+  private final ScheduledExecutorService gated;
+  private final Queue<Runnable> queue = Lists.newLinkedList();
+
+  /**
+   * Creates a gating delay executor that will gate work from the provided executor.
+   *
+   * @param gated Delegate to execute work with when ungated.
+   */
+  @Inject
+  GatingDelayExecutor(ScheduledExecutorService gated) {
+    this.gated = requireNonNull(gated);
+  }
+
+  private final ThreadLocal<Boolean> isOpen = new ThreadLocal<Boolean>() {
+    @Override
+    protected Boolean initialValue() {
+      return true;
+    }
+  };
+
+  @Override
+  public <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E {
+    boolean startedOpen = isOpen.get();
+    isOpen.set(false);
+
+    try {
+      return operation.doWithGateClosed();
+    } finally {
+      if (startedOpen) {
+        isOpen.set(true);
+        flush();
+      }
+    }
+  }
+
+  synchronized int getQueueSize() {
+    return queue.size();
+  }
+
+  private synchronized void enqueue(Runnable work) {
+    if (isOpen.get()) {
+      work.run();
+    } else {
+      queue.add(work);
+    }
+  }
+
+  private synchronized void flush() {
+    for (Runnable work : Iterables.consumingIterable(queue)) {
+      work.run();
+    }
+  }
+
+  @Override
+  public synchronized void execute(Runnable command) {
+    enqueue(() -> gated.execute(command));
+  }
+
+  @Override
+  public synchronized void execute(Runnable work, Amount<Long, Time> minDelay) {
+    enqueue(() -> gated.schedule(work, minDelay.getValue(), minDelay.getUnit().getTimeUnit()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
index 6da6193..e3efbdb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
@@ -39,7 +39,7 @@ import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.CronJobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -156,7 +156,15 @@ public final class DbModule extends PrivateModule {
         new AbstractModule() {
           @Override
           protected void configure() {
-            bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance(() -> { });
+            bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance(
+                new GatedWorkQueue() {
+                  @Override
+                  public <T, E extends Exception> T closeDuring(
+                      GatedOperation<T, E> operation) throws E {
+
+                    return operation.doWithGateClosed();
+                  }
+                });
           }
         },
         new DbModule(

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index 6036570..dd7e1d3 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -29,7 +29,8 @@ import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue.GatedOperation;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.CronJobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -61,13 +62,13 @@ class DbStorage extends AbstractIdleService implements Storage {
   private final SqlSessionFactory sessionFactory;
   private final MutableStoreProvider storeProvider;
   private final EnumValueMapper enumValueMapper;
-  private final FlushableWorkQueue postTransactionWork;
+  private final GatedWorkQueue gatedWorkQueue;
 
   @Inject
   DbStorage(
       SqlSessionFactory sessionFactory,
       EnumValueMapper enumValueMapper,
-      @AsyncExecutor FlushableWorkQueue postTransactionWork,
+      @AsyncExecutor GatedWorkQueue gatedWorkQueue,
       final CronJobStore.Mutable cronJobStore,
       final TaskStore.Mutable taskStore,
       final SchedulerStore.Mutable schedulerStore,
@@ -78,7 +79,7 @@ class DbStorage extends AbstractIdleService implements Storage {
 
     this.sessionFactory = requireNonNull(sessionFactory);
     this.enumValueMapper = requireNonNull(enumValueMapper);
-    this.postTransactionWork = requireNonNull(postTransactionWork);
+    this.gatedWorkQueue = requireNonNull(gatedWorkQueue);
     requireNonNull(cronJobStore);
     requireNonNull(taskStore);
     requireNonNull(schedulerStore);
@@ -140,13 +141,6 @@ class DbStorage extends AbstractIdleService implements Storage {
     }
   }
 
-  private final ThreadLocal<Boolean> inTransaction = new ThreadLocal<Boolean>() {
-    @Override
-    protected Boolean initialValue() {
-      return false;
-    }
-  };
-
   @Transactional
   <T, E extends Exception> T transactionedWrite(MutateWork<T, E> work) throws E {
     return work.apply(storeProvider);
@@ -155,28 +149,22 @@ class DbStorage extends AbstractIdleService implements Storage {
   @Timed("db_storage_write_operation")
   @Override
   public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
-    // Only flush for the top-level write() call when calls are reentrant.
-    boolean shouldFlush = !inTransaction.get();
-    if (shouldFlush) {
-      inTransaction.set(true);
-    }
-
-    try {
-      return transactionedWrite(work);
-    } catch (PersistenceException e) {
-      throw new StorageException(e.getMessage(), e);
-    } finally {
-      // NOTE: Async work is intentionally executed regardless of whether the transaction succeeded.
-      // Doing otherwise runs the risk of cross-talk between transactions and losing async tasks
-      // due to failure of an unrelated transaction.  This matches behavior prior to the
-      // introduction of DbStorage, but should be revisited.
-      // TODO(wfarner): Consider revisiting to execute async work only when the transaction is
-      // successful.
-      if (shouldFlush) {
-        postTransactionWork.flush();
-        inTransaction.set(false);
+    // NOTE: Async work is intentionally executed regardless of whether the transaction succeeded.
+    // Doing otherwise runs the risk of cross-talk between transactions and losing async tasks
+    // due to failure of an unrelated transaction.  This matches behavior prior to the
+    // introduction of DbStorage, but should be revisited.
+    // TODO(wfarner): Consider revisiting to execute async work only when the transaction is
+    // successful.
+    return gatedWorkQueue.closeDuring(new GatedOperation<T, E>() {
+      @Override
+      public T doWithGateClosed() throws E {
+        try {
+          return transactionedWrite(work);
+        } catch (PersistenceException e) {
+          throw new StorageException(e.getMessage(), e);
+        }
       }
-    }
+    });
   }
 
   @VisibleForTesting
@@ -191,20 +179,24 @@ class DbStorage extends AbstractIdleService implements Storage {
   public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
       throws StorageException, E {
 
-    // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk
-    // insert.
-    try (SqlSession session = sessionFactory.openSession(false)) {
-      try {
-        session.update(DISABLE_UNDO_LOG);
-        work.apply(storeProvider);
-      } catch (PersistenceException e) {
-        throw new StorageException(e.getMessage(), e);
-      } finally {
-        session.update(ENABLE_UNDO_LOG);
+    gatedWorkQueue.closeDuring(new GatedOperation<Void, E>() {
+      @Override
+      public Void doWithGateClosed() throws E {
+        // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk
+        // insert.
+        try (SqlSession session = sessionFactory.openSession(false)) {
+          try {
+            session.update(DISABLE_UNDO_LOG);
+            work.apply(storeProvider);
+          } catch (PersistenceException e) {
+            throw new StorageException(e.getMessage(), e);
+          } finally {
+            session.update(ENABLE_UNDO_LOG);
+          }
+        }
+        return null;
       }
-    } finally {
-      postTransactionWork.flush();
-    }
+    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 4941128..a44b9da 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -72,8 +72,6 @@ import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
 import org.apache.aurora.scheduler.AppStartup;
 import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
@@ -378,8 +376,6 @@ public class SchedulerIT extends BaseZooKeeperTest {
     scheduler.getValue().registered(driver,
         FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
         MasterInfo.getDefaultInstance());
-    // Registration is published on the event bus, which will be gated until a flush.
-    injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class)).flush();
 
     awaitSchedulerReady();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java b/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java
deleted file mode 100644
index 2867633..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expectLastCall;
-
-public class GatedDelayExecutorTest extends EasyMockTest {
-
-  private static final Amount<Long, Time> ONE_SECOND = Amount.of(1L, Time.SECONDS);
-
-  private ScheduledExecutorService mockExecutor;
-  private Runnable runnable;
-  private GatedDelayExecutor gatedExecutor;
-
-  @Before
-  public void setUp() {
-    mockExecutor = createMock(ScheduledExecutorService.class);
-    runnable = createMock(Runnable.class);
-    gatedExecutor = new GatedDelayExecutor(mockExecutor);
-  }
-
-  @Test
-  public void testNoFlush() {
-    control.replay();
-
-    gatedExecutor.execute(runnable);
-    // flush() is not called, so no work is performed.
-  }
-
-  private IExpectationSetters<?> invokeWorkWhenSubmitted() {
-    return expectLastCall().andAnswer(new IAnswer<Object>() {
-      @Override
-      public Object answer() {
-        ((Runnable) EasyMock.getCurrentArguments()[0]).run();
-        return null;
-      }
-    });
-  }
-
-  @Test
-  public void testExecute() {
-    mockExecutor.execute(EasyMock.<Runnable>anyObject());
-    invokeWorkWhenSubmitted();
-    runnable.run();
-    expectLastCall();
-
-    control.replay();
-
-    gatedExecutor.execute(runnable);
-    gatedExecutor.flush();
-  }
-
-  @Test
-  public void testExecuteAfterDelay() {
-    mockExecutor.schedule(
-        EasyMock.<Runnable>anyObject(),
-        eq(ONE_SECOND.getValue().longValue()),
-        eq(ONE_SECOND.getUnit().getTimeUnit()));
-    invokeWorkWhenSubmitted();
-    runnable.run();
-
-    control.replay();
-
-    gatedExecutor.execute(runnable, ONE_SECOND);
-    gatedExecutor.flush();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java b/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java
new file mode 100644
index 0000000..c62a1d5
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.async.GatedWorkQueue.GatedOperation;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class GatingDelayExecutorTest extends EasyMockTest {
+
+  private static final Amount<Long, Time> ONE_SECOND = Amount.of(1L, Time.SECONDS);
+
+  private ScheduledExecutorService gatedExecutor;
+  private Runnable runnable;
+  private GatingDelayExecutor gatingExecutor;
+
+  @Before
+  public void setUp() {
+    gatedExecutor = createMock(ScheduledExecutorService.class);
+    runnable = createMock(Runnable.class);
+    gatingExecutor = new GatingDelayExecutor(gatedExecutor);
+  }
+
+  @Test
+  public void testGateOpen() {
+    gatedExecutor.execute(runnable);
+
+    control.replay();
+
+    // The gate was not closed, so the work is executed immediately.
+    gatingExecutor.execute(runnable);
+  }
+
+  private IExpectationSetters<?> invokeWorkWhenSubmitted() {
+    return expectLastCall().andAnswer(new IAnswer<Object>() {
+      @Override
+      public Object answer() {
+        ((Runnable) EasyMock.getCurrentArguments()[0]).run();
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testGateIsThreadSpecific() throws InterruptedException {
+    gatedExecutor.execute(runnable);
+
+    control.replay();
+
+    CountDownLatch gateClosed = new CountDownLatch(1);
+    CountDownLatch unblock = new CountDownLatch(1);
+    Runnable closer = new Runnable() {
+      @Override
+      public void run() {
+        gatingExecutor.closeDuring(new GatedOperation<String, RuntimeException>() {
+          @Override
+          public String doWithGateClosed() {
+            gateClosed.countDown();
+            try {
+              unblock.await();
+            } catch (InterruptedException e) {
+              throw Throwables.propagate(e);
+            }
+            return "hi";
+          }
+        });
+      }
+    };
+    new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("GateTest")
+        .build()
+        .newThread(closer)
+        .start();
+
+    gateClosed.await();
+    gatingExecutor.execute(runnable);
+    assertQueueSize(0);
+    unblock.countDown();
+  }
+
+  private void assertQueueSize(int size) {
+    assertEquals(size, gatingExecutor.getQueueSize());
+  }
+
+  @Test
+  public void testReentrantClose() {
+    gatedExecutor.execute(runnable);
+    expectLastCall().times(3);
+
+    control.replay();
+
+    gatingExecutor.execute(runnable);
+    assertQueueSize(0);
+
+    String result = gatingExecutor.closeDuring(new GatedOperation<String, RuntimeException>() {
+      @Override
+      public String doWithGateClosed() {
+        gatingExecutor.execute(runnable);
+        assertQueueSize(1);
+
+        String result = gatingExecutor.closeDuring(new GatedOperation<String, RuntimeException>() {
+          @Override
+          public String doWithGateClosed() {
+            gatingExecutor.execute(runnable);
+            assertQueueSize(2);
+            return "hello";
+          }
+        });
+        assertEquals("hello", result);
+
+        return "hi";
+      }
+    });
+    assertEquals("hi", result);
+    assertQueueSize(0);
+  }
+
+  @Test
+  public void testExecute() {
+    gatedExecutor.execute(runnable);
+    invokeWorkWhenSubmitted();
+    runnable.run();
+    expectLastCall();
+
+    control.replay();
+
+    gatingExecutor.execute(runnable);
+  }
+
+  @Test
+  public void testExecuteAfterDelay() {
+    gatedExecutor.schedule(
+        runnable,
+        ONE_SECOND.getValue().longValue(),
+        ONE_SECOND.getUnit().getTimeUnit());
+    invokeWorkWhenSubmitted();
+    runnable.run();
+
+    control.replay();
+
+    gatingExecutor.execute(runnable, ONE_SECOND);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 0c7da07..acd2cd1 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -13,14 +13,19 @@
  */
 package org.apache.aurora.scheduler.pruning;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -44,7 +49,6 @@ import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.async.AsyncModule;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.DelayExecutor;
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
@@ -56,6 +60,7 @@ import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -84,6 +89,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
   private StateManager stateManager;
   private StorageTestUtil storageUtil;
   private TaskHistoryPruner pruner;
+  private Closer closer;
 
   @Before
   public void setUp() {
@@ -98,6 +104,12 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
         clock,
         new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
         storageUtil.storage);
+    closer = Closer.create();
+  }
+
+  @After
+  public void tearDownCloser() throws Exception {
+    closer.close();
   }
 
   @Test
@@ -242,6 +254,13 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
             .setDaemon(true)
             .setNameFormat("testThreadSafeEvents-executor")
             .build());
+    closer.register(new Closeable() {
+      @Override
+      public void close() throws IOException {
+        MoreExecutors.shutdownAndAwaitTermination(realExecutor, 1L, TimeUnit.SECONDS);
+      }
+    });
+
     Injector injector = Guice.createInjector(
         new AsyncModule(realExecutor),
         new AbstractModule() {
@@ -251,24 +270,16 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
           }
         });
     executor = injector.getInstance(Key.get(DelayExecutor.class, AsyncExecutor.class));
-    FlushableWorkQueue flusher =
-        injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class));
 
     pruner = buildPruner(executor);
-    Command onDeleted = new Command() {
-      @Override
-      public void execute() {
-        // The goal is to verify that the call does not deadlock. We do not care about the outcome.
-        changeState(makeTask("b", ASSIGNED), STARTING);
-      }
-    };
+    // The goal is to verify that the call does not deadlock. We do not care about the outcome.
+    Command onDeleted = () -> changeState(makeTask("b", ASSIGNED), STARTING);
     CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
 
     control.replay();
 
     // Change the task to a terminal state and wait for it to be pruned.
     changeState(makeTask(TASK_ID, RUNNING), KILLED);
-    flusher.flush();
     taskDeleted.await();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
index 6dd5026..a0bd34b 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -13,11 +13,9 @@
  */
 package org.apache.aurora.scheduler.storage.db;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue.GatedOperation;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.CronJobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -34,20 +32,19 @@ import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 public class DbStorageTest extends EasyMockTest {
 
   private SqlSessionFactory sessionFactory;
   private SqlSession session;
   private EnumValueMapper enumMapper;
-  private FlushableWorkQueue flusher;
+  private GatedWorkQueue gatedWorkQueue;
   private Work.Quiet<String> readWork;
   private MutateWork.NoResult.Quiet writeWork;
 
@@ -58,14 +55,14 @@ public class DbStorageTest extends EasyMockTest {
     sessionFactory = createMock(SqlSessionFactory.class);
     session = createMock(SqlSession.class);
     enumMapper = createMock(EnumValueMapper.class);
-    flusher = createMock(FlushableWorkQueue.class);
+    gatedWorkQueue = createMock(GatedWorkQueue.class);
     readWork = createMock(new Clazz<Work.Quiet<String>>() { });
     writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { });
 
     storage = new DbStorage(
         sessionFactory,
         enumMapper,
-        flusher,
+        gatedWorkQueue,
         createMock(CronJobStore.Mutable.class),
         createMock(TaskStore.Mutable.class),
         createMock(SchedulerStore.Mutable.class),
@@ -94,12 +91,23 @@ public class DbStorageTest extends EasyMockTest {
     assertEquals("hi", storage.read(readWork));
   }
 
+  private IExpectationSetters<?> expectGateClosed() throws Exception {
+    return expect(gatedWorkQueue.closeDuring(EasyMock.anyObject()))
+        .andAnswer(new IAnswer<Object>() {
+          @Override
+          public Object answer() throws Throwable {
+            GatedOperation<?, ?> op = (GatedOperation<?, ?>) EasyMock.getCurrentArguments()[0];
+            return op.doWithGateClosed();
+          }
+        });
+  }
+
   @Test(expected = StorageException.class)
-  public void testBulkLoadFails() {
+  public void testBulkLoadFails() throws Exception {
     expect(sessionFactory.openSession(false)).andReturn(session);
     expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException());
     expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
-    flusher.flush();
+    expectGateClosed();
 
     control.replay();
 
@@ -107,13 +115,13 @@ public class DbStorageTest extends EasyMockTest {
   }
 
   @Test
-  public void testBulkLoad() {
+  public void testBulkLoad() throws Exception {
     expect(sessionFactory.openSession(false)).andReturn(session);
     expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andReturn(0);
     expect(writeWork.apply(EasyMock.anyObject())).andReturn(null);
     session.close();
     expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
-    flusher.flush();
+    expectGateClosed();
 
     control.replay();
 
@@ -121,16 +129,8 @@ public class DbStorageTest extends EasyMockTest {
   }
 
   @Test
-  public void testFlushWithReentrantWrites() {
-    final AtomicBoolean flushed = new AtomicBoolean(false);
-    flusher.flush();
-    expectLastCall().andAnswer(new IAnswer<Void>() {
-      @Override
-      public Void answer() {
-        flushed.set(true);
-        return null;
-      }
-    });
+  public void testGateWithReentrantWrites() throws Exception {
+    expectGateClosed().times(2);
 
     control.replay();
 
@@ -138,9 +138,6 @@ public class DbStorageTest extends EasyMockTest {
       @Override
       public void execute(MutableStoreProvider storeProvider) {
         noopWrite();
-
-        // Should not have flushed yet.
-        assertFalse("flush() should not be called until outer write() completes.", flushed.get());
       }
     });
   }
@@ -155,9 +152,8 @@ public class DbStorageTest extends EasyMockTest {
   }
 
   @Test
-  public void testFlushWithSeqentialWrites() {
-    flusher.flush();
-    expectLastCall().times(2);
+  public void testFlushWithSeqentialWrites() throws Exception {
+    expectGateClosed().times(2);
 
     control.replay();