You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/04/25 00:25:48 UTC

aurora git commit: Simplify storage bindings, remove shims that are now mostly getting in the way.

Repository: aurora
Updated Branches:
  refs/heads/master 7f3e4e3c5 -> 65df91bfd


Simplify storage bindings, remove shims that are now mostly getting in the way.

Reviewed at https://reviews.apache.org/r/33530/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/65df91bf
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/65df91bf
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/65df91bf

Branch: refs/heads/master
Commit: 65df91bfd7e3a2ada38a5fe4d620e6373d0f59bf
Parents: 7f3e4e3
Author: Bill Farner <wf...@apache.org>
Authored: Fri Apr 24 15:15:48 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Apr 24 15:15:48 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  |   4 +-
 .../aurora/benchmark/ThriftApiBenchmarks.java   |   5 +-
 .../aurora/benchmark/UpdateStoreBenchmarks.java |   5 +-
 .../aurora/scheduler/app/SchedulerMain.java     |  12 +-
 .../storage/backup/TemporaryStorage.java        |   4 +-
 .../aurora/scheduler/storage/db/DbModule.java   |   7 +
 .../aurora/scheduler/storage/db/DbStorage.java  |  17 +-
 .../aurora/scheduler/storage/db/DbUtil.java     |  42 +++
 .../scheduler/storage/db/MigrationModule.java   |  60 -----
 .../scheduler/storage/log/LogStorage.java       |  30 +--
 .../storage/mem/InMemStoresModule.java          |  52 ++++
 .../scheduler/storage/mem/MemStorage.java       | 163 ------------
 .../scheduler/storage/mem/MemStorageModule.java |  75 ------
 .../scheduler/app/local/LocalSchedulerMain.java |   3 +-
 .../scheduler/async/TaskSchedulerImplTest.java  |   4 +-
 .../scheduler/async/TaskSchedulerTest.java      |   4 +-
 .../cron/quartz/AuroraCronJobTest.java          |   8 +-
 .../aurora/scheduler/cron/quartz/CronIT.java    |   4 +-
 .../cron/quartz/CronJobManagerImplTest.java     |   4 +-
 .../scheduler/state/LockManagerImplTest.java    |   4 +-
 .../scheduler/state/StateManagerImplTest.java   |   4 +-
 .../scheduler/stats/ResourceCounterTest.java    |   4 +-
 .../scheduler/storage/StorageBackfillTest.java  |   4 +-
 .../scheduler/storage/db/DbStorageTest.java     |   4 +
 .../aurora/scheduler/storage/db/DbUtil.java     |  34 ---
 .../scheduler/storage/mem/MemStorageTest.java   | 257 ------------------
 .../scheduler/storage/mem/MemTaskStoreTest.java |  12 +-
 .../storage/mem/StorageTransactionTest.java     | 259 +++++++++++++++++++
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   6 +-
 29 files changed, 413 insertions(+), 678 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 12f42a9..372addc 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -58,10 +58,10 @@ import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -104,7 +104,7 @@ public class SchedulingBenchmarks {
      */
     @Setup(Level.Trial)
     public void setUpBenchmark() {
-      storage = MemStorage.newEmptyStorage();
+      storage = DbUtil.createStorage();
       eventBus = new EventBus();
       final FakeClock clock = new FakeClock();
       clock.setNowMillis(System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
index d20b088..6ec0e14 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
@@ -35,8 +35,6 @@ import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.db.DbModule;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.thrift.ThriftModule;
 import org.apache.thrift.TException;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -93,8 +91,7 @@ public class ThriftApiBenchmarks {
               bind(LockManager.class).toInstance(createThrowingFake(LockManager.class));
             }
           },
-          new MemStorageModule(Bindings.KeyFactory.PLAIN),
-          new DbModule(Bindings.annotatedKeyFactory(MemStorage.Delegated.class)),
+          DbModule.testModule(Bindings.KeyFactory.PLAIN),
           new ThriftModule.ReadOnly());
       api = injector.getInstance(ReadOnlyScheduler.Iface.class);
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
index c7456aa..a4abbd8 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
@@ -50,8 +50,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.thrift.TException;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -91,8 +89,7 @@ public class UpdateStoreBenchmarks {
               bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
             }
           },
-          new MemStorageModule(Bindings.KeyFactory.PLAIN),
-          new DbModule(Bindings.annotatedKeyFactory(MemStorage.Delegated.class)));
+          DbModule.testModule(Bindings.KeyFactory.PLAIN));
       storage = injector.getInstance(Storage.class);
       storage.prepare();
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 087abe5..3d19831 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -55,14 +55,11 @@ import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
 import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
 import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
 import org.apache.aurora.scheduler.storage.db.DbModule;
-import org.apache.aurora.scheduler.storage.db.MigrationModule;
-import org.apache.aurora.scheduler.storage.log.LogStorage;
 import org.apache.aurora.scheduler.storage.log.LogStorageModule;
 import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.mem.MemStorage.Delegated;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 
 import static com.twitter.common.logging.RootLogConfig.Configuration;
 
@@ -157,13 +154,8 @@ public class SchedulerMain extends AbstractApplication {
         .add(new AppModule(clusterName, serverSetPath, zkClientConfig, statsURLPrefix))
         .addAll(getExtraModules())
         .add(getPersistentStorageModule())
-        .add(new MemStorageModule(Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class)))
         .add(new CronModule())
-        .add(new DbModule(Bindings.annotatedKeyFactory(Delegated.class)))
-        .add(new MigrationModule(
-            Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class),
-            Bindings.annotatedKeyFactory(Delegated.class))
-        )
+        .add(new DbModule(Bindings.annotatedKeyFactory(Storage.Volatile.class)))
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 586b53b..23c0c1e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -28,9 +28,9 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 
 /**
  * A short-lived in-memory storage system that can be converted to a {@link Snapshot}.
@@ -66,7 +66,7 @@ interface TemporaryStorage {
   class TemporaryStorageFactory implements Function<Snapshot, TemporaryStorage> {
     @Override
     public TemporaryStorage apply(Snapshot snapshot) {
-      final Storage storage = MemStorage.newEmptyStorage();
+      final Storage storage = DbUtil.createStorage();
       FakeClock clock = new FakeClock();
       clock.setNowMillis(snapshot.getTimestamp());
       final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl(clock, storage);

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
index 439d8cc..d6ca430 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
@@ -25,12 +25,15 @@ import com.google.inject.PrivateModule;
 import com.twitter.common.inject.Bindings;
 
 import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.LockStore;
 import org.apache.aurora.scheduler.storage.QuotaStore;
 import org.apache.aurora.scheduler.storage.SchedulerStore;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.db.typehandlers.TypeHandlers;
+import org.apache.aurora.scheduler.storage.mem.InMemStoresModule;
 import org.apache.ibatis.session.AutoMappingBehavior;
 import org.apache.ibatis.session.SqlSessionFactory;
 import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
@@ -130,6 +133,10 @@ public class DbModule extends PrivateModule {
         addTypeHandlersClasses(TypeHandlers.getAll());
       }
     });
+    install(new InMemStoresModule(keyFactory));
+    expose(keyFactory.create(CronJobStore.Mutable.class));
+    expose(keyFactory.create(TaskStore.Mutable.class));
+
     bindStore(AttributeStore.Mutable.class, DbAttributeStore.class);
     bindStore(LockStore.Mutable.class, DbLockStore.class);
     bindStore(QuotaStore.Mutable.class, DbQuotaStore.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index 49db52d..fbdbb05 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.CharStreams;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.Inject;
+import com.twitter.common.inject.TimedInterceptor.Timed;
 
 import org.apache.aurora.gen.JobUpdateAction;
 import org.apache.aurora.gen.JobUpdateStatus;
@@ -49,11 +50,6 @@ import static org.apache.ibatis.mapping.SqlCommandType.UPDATE;
  * A storage implementation backed by a relational database.
  * <p>
  * Delegates read and write concurrency semantics to the underlying database.
- * This class is currently only partially implemented, with the underlying
- * {@link MutableStoreProvider} only providing some, but not all, store implementations. It is
- * designed to be a long term replacement for
- * {@link org.apache.aurora.scheduler.storage.mem.MemStorage}.
- * </p>
  */
 class DbStorage extends AbstractIdleService implements Storage {
 
@@ -65,6 +61,8 @@ class DbStorage extends AbstractIdleService implements Storage {
   DbStorage(
       SqlSessionFactory sessionFactory,
       EnumValueMapper enumValueMapper,
+      final CronJobStore.Mutable cronJobStore,
+      final TaskStore.Mutable taskStore,
       final SchedulerStore.Mutable schedulerStore,
       final AttributeStore.Mutable attributeStore,
       final LockStore.Mutable lockStore,
@@ -86,17 +84,17 @@ class DbStorage extends AbstractIdleService implements Storage {
 
       @Override
       public CronJobStore.Mutable getCronJobStore() {
-        throw new UnsupportedOperationException("Not yet implemented.");
+        return cronJobStore;
       }
 
       @Override
       public TaskStore getTaskStore() {
-        throw new UnsupportedOperationException("Not yet implemented.");
+        return taskStore;
       }
 
       @Override
       public TaskStore.Mutable getUnsafeTaskStore() {
-        throw new UnsupportedOperationException("Not yet implemented.");
+        return taskStore;
       }
 
       @Override
@@ -121,6 +119,7 @@ class DbStorage extends AbstractIdleService implements Storage {
     };
   }
 
+  @Timed("db_storage_read_operation")
   @Override
   @Transactional
   public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
@@ -131,6 +130,7 @@ class DbStorage extends AbstractIdleService implements Storage {
     }
   }
 
+  @Timed("db_storage_write_operation")
   @Override
   @Transactional
   public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
@@ -148,6 +148,7 @@ class DbStorage extends AbstractIdleService implements Storage {
 
   // TODO(wfarner): Including @Transactional here seems to render the UNDO_LOG changes useless,
   // resulting in no performance gain.  Figure out why.
+  @Timed("db_storage_bulk_load_operation")
   @Override
   public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
       throws StorageException, E {

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
new file mode 100644
index 0000000..eae1770
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.twitter.common.inject.Bindings;
+
+import org.apache.aurora.scheduler.storage.Storage;
+
+/**
+ * Utility class for creating ad-hoc storage instances.
+ */
+public final class DbUtil {
+
+  private DbUtil() {
+    // Utility class.
+  }
+
+  /**
+   * Creates a new, empty storage system.
+   *
+   * @return A new storage instance.
+   */
+  public static Storage createStorage() {
+    Injector injector = Guice.createInjector(DbModule.testModule(Bindings.KeyFactory.PLAIN));
+    Storage storage = injector.getInstance(Storage.class);
+    storage.prepare();
+    return storage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/db/MigrationModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/MigrationModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/MigrationModule.java
deleted file mode 100644
index a821de3..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/MigrationModule.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import com.google.inject.AbstractModule;
-import com.twitter.common.inject.Bindings.KeyFactory;
-
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Temporary module to wire the two partial storage implementations together as we
- * migrate from MemStorage to DbStorage.  This accepts two {@link KeyFactory}s,
- * one that references the binding scope for the feature-complete write-behind
- * volatile storage system, and one for the binding scope of the new and partially-implemented
- * storage system.
- * <p>
- *  Once the new storage system is feature-complete, this module will be deleted
- *  as the binding bridge is no longer necessary.
- * </p>
- */
-public class MigrationModule extends AbstractModule {
-
-  private final KeyFactory toFactory;
-  private final KeyFactory fromFactory;
-
-  public MigrationModule(KeyFactory from, KeyFactory to) {
-    this.fromFactory = requireNonNull(from);
-    this.toFactory = requireNonNull(to);
-  }
-
-  private <T> void link(Class<T> clazz) {
-    bind(fromFactory.create(clazz)).to(toFactory.create(clazz));
-  }
-
-  @Override
-  protected void configure() {
-    link(AttributeStore.Mutable.class);
-    link(LockStore.Mutable.class);
-    link(QuotaStore.Mutable.class);
-    link(SchedulerStore.Mutable.class);
-    link(JobUpdateStore.Mutable.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index bb59cdf..c58f531 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -14,10 +14,6 @@
 package org.apache.aurora.scheduler.storage.log;
 
 import java.io.IOException;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
@@ -28,7 +24,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
-import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -212,15 +207,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
       new SlidingStats("log_storage_write_lock_wait", "ns");
   private final AtomicLong droppedUpdateEvents = Stats.exportLong("dropped_update_events");
 
-  /**
-   * Identifies a local storage layer that is written to only after first ensuring the write
-   * operation is persisted in the log.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
-  @Qualifier
-  public @interface WriteBehind { }
-
   private final Map<LogEntry._Fields, Closure<LogEntry>> logEntryReplayActions;
   private final Map<Op._Fields, Closure<Op>> transactionReplayActions;
 
@@ -230,14 +216,14 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
       ShutdownRegistry shutdownRegistry,
       Settings settings,
       SnapshotStore<Snapshot> snapshotStore,
-      @WriteBehind Storage storage,
-      @WriteBehind SchedulerStore.Mutable schedulerStore,
-      @WriteBehind CronJobStore.Mutable jobStore,
-      @WriteBehind TaskStore.Mutable taskStore,
-      @WriteBehind LockStore.Mutable lockStore,
-      @WriteBehind QuotaStore.Mutable quotaStore,
-      @WriteBehind AttributeStore.Mutable attributeStore,
-      @WriteBehind JobUpdateStore.Mutable jobUpdateStore,
+      @Volatile Storage storage,
+      @Volatile SchedulerStore.Mutable schedulerStore,
+      @Volatile CronJobStore.Mutable jobStore,
+      @Volatile TaskStore.Mutable taskStore,
+      @Volatile LockStore.Mutable lockStore,
+      @Volatile QuotaStore.Mutable quotaStore,
+      @Volatile AttributeStore.Mutable attributeStore,
+      @Volatile JobUpdateStore.Mutable jobUpdateStore,
       EventSink eventSink,
       ReentrantLock writeLock) {
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
new file mode 100644
index 0000000..88fd006
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.twitter.common.inject.Bindings.KeyFactory;
+
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.TaskStore;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Binding module for an in-memory storage system.
+ * <p>
+ * NOTE: These stores are being phased out in favor of database-backed stores.
+ */
+public final class InMemStoresModule extends AbstractModule {
+
+  private final KeyFactory keyFactory;
+
+  public InMemStoresModule(KeyFactory keyFactory) {
+    this.keyFactory = requireNonNull(keyFactory);
+  }
+
+  private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
+    bind(binding).to(impl);
+    bind(impl).in(Singleton.class);
+    Key<T> key = keyFactory.create(binding);
+    bind(key).to(impl);
+  }
+
+  @Override
+  protected void configure() {
+    bindStore(CronJobStore.Mutable.class, MemJobStore.class);
+    bindStore(TaskStore.Mutable.class, MemTaskStore.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
deleted file mode 100644
index c5ccccd..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.twitter.common.inject.Bindings;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.db.DbModule;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A storage implementation comprised of individual in-memory store implementations.
- */
-public class MemStorage implements Storage {
-  private final MutableStoreProvider storeProvider;
-  private final Storage delegatedStore;
-
-  /**
-   * Identifies a storage layer to be delegated to instead of mem storage.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
-  @Qualifier
-  public @interface Delegated { }
-
-  @VisibleForTesting
-  static final String THREADS_WAITING_GAUGE = "storage_lock_threads_waiting";
-
-  @Inject
-  MemStorage(
-      @Delegated final SchedulerStore.Mutable schedulerStore,
-      final CronJobStore.Mutable jobStore,
-      final TaskStore.Mutable taskStore,
-      @Delegated final LockStore.Mutable lockStore,
-      @Delegated final Storage delegated,
-      @Delegated final QuotaStore.Mutable quotaStore,
-      @Delegated final AttributeStore.Mutable attributeStore,
-      @Delegated final JobUpdateStore.Mutable updateStore) {
-
-    this.delegatedStore = requireNonNull(delegated);
-    storeProvider = new MutableStoreProvider() {
-      @Override
-      public SchedulerStore.Mutable getSchedulerStore() {
-        return schedulerStore;
-      }
-
-      @Override
-      public CronJobStore.Mutable getCronJobStore() {
-        return jobStore;
-      }
-
-      @Override
-      public TaskStore getTaskStore() {
-        return taskStore;
-      }
-
-      @Override
-      public TaskStore.Mutable getUnsafeTaskStore() {
-        return taskStore;
-      }
-
-      @Override
-      public LockStore.Mutable getLockStore() {
-        return lockStore;
-      }
-
-      @Override
-      public QuotaStore.Mutable getQuotaStore() {
-        return quotaStore;
-      }
-
-      @Override
-      public AttributeStore.Mutable getAttributeStore() {
-        return attributeStore;
-      }
-
-      @Override
-      public JobUpdateStore.Mutable getJobUpdateStore() {
-        return updateStore;
-      }
-    };
-  }
-
-  /**
-   * Creates a new empty in-memory storage for use in testing.
-   */
-  @VisibleForTesting
-  public static Storage newEmptyStorage() {
-    Injector injector = Guice.createInjector(
-        DbModule.testModule(Bindings.annotatedKeyFactory(Delegated.class)),
-        new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)));
-    Storage storage = injector.getInstance(Key.get(Storage.class, Volatile.class));
-    storage.prepare();
-    return storage;
-  }
-
-  @Timed("mem_storage_read_operation")
-  @Override
-  public <T, E extends Exception> T read(final Work<T, E> work) throws StorageException, E {
-    return delegatedStore.read(new Work<T, E>() {
-      @Override
-      public T apply(StoreProvider provider) throws E {
-        return work.apply(storeProvider);
-      }
-    });
-  }
-
-  @Timed("mem_storage_write_operation")
-  @Override
-  public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
-    return delegatedStore.write(new MutateWork<T, E>() {
-      @Override
-      public T apply(MutableStoreProvider provider) throws E {
-        return work.apply(storeProvider);
-      }
-    });
-  }
-
-  @Timed("mem_storage_bulk_load_operation")
-  @Override
-  public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
-      throws StorageException, E {
-
-    delegatedStore.bulkLoad(work);
-  }
-
-  @Override
-  public void prepare() throws StorageException {
-    delegatedStore.prepare();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
deleted file mode 100644
index 9068aa4..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import javax.inject.Singleton;
-
-import com.google.inject.Key;
-import com.google.inject.PrivateModule;
-import com.twitter.common.inject.Bindings.KeyFactory;
-
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.Volatile;
-import org.apache.aurora.scheduler.storage.TaskStore;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Binding module for an in-memory storage system.
- * <p>
- * Exposes bindings for storage components:
- * <ul>
- *   <li>{@link org.apache.aurora.scheduler.storage.Storage}</li>
- *   <li>Keyed with keys provided by the provided{@code keyFactory}:</li>
- *     <ul>
- *       <li>{@link org.apache.aurora.scheduler.storage.SchedulerStore}</li>
- *       <li>{@link org.apache.aurora.scheduler.storage.CronJobStore}</li>
- *       <li>{@link org.apache.aurora.scheduler.storage.TaskStore}</li>
- *       <li>{@link org.apache.aurora.scheduler.storage.LockStore}</li>
- *       <li>{@link org.apache.aurora.scheduler.storage.QuotaStore}</li>
- *       <li>{@link org.apache.aurora.scheduler.storage.AttributeStore}</li>
- *     </ul>
- * </ul>
- */
-public final class MemStorageModule extends PrivateModule {
-
-  private final KeyFactory keyFactory;
-
-  public MemStorageModule(KeyFactory keyFactory) {
-    this.keyFactory = requireNonNull(keyFactory);
-  }
-
-  private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
-    bind(binding).to(impl);
-    bind(impl).in(Singleton.class);
-    Key<T> key = keyFactory.create(binding);
-    bind(key).to(impl);
-    expose(key);
-  }
-
-  @Override
-  protected void configure() {
-    Key<Storage> storageKey = keyFactory.create(Storage.class);
-    bind(storageKey).to(MemStorage.class);
-    expose(storageKey);
-    Key<Storage> exposedMemStorageKey = Key.get(Storage.class, Volatile.class);
-    bind(exposedMemStorageKey).to(MemStorage.class);
-    expose(exposedMemStorageKey);
-    bind(MemStorage.class).in(Singleton.class);
-
-    bindStore(CronJobStore.Mutable.class, MemJobStore.class);
-    bindStore(TaskStore.Mutable.class, MemTaskStore.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index 4a8d404..a91c4a2 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -35,7 +35,6 @@ import org.apache.aurora.scheduler.mesos.DriverSettings;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.log.LogStorage;
 import org.apache.mesos.Protos;
 import org.apache.mesos.SchedulerDriver;
 import org.apache.shiro.io.ResourceUtils;
@@ -58,7 +57,7 @@ public class LocalSchedulerMain extends SchedulerMain {
     return new AbstractModule() {
       @Override
       protected void configure() {
-        bind(Storage.class).to(Key.get(Storage.class, LogStorage.WriteBehind.class));
+        bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class));
         bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
         bind(DistributedSnapshotStore.class).toInstance(new DistributedSnapshotStore() {
           @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index bd1aaa1..53b21cb 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -48,9 +48,9 @@ import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.TaskInfo;
 import org.easymock.Capture;
@@ -243,7 +243,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     // Ensures that tasks in THROTTLED state are not considered part of the active job state passed
     // to the assigner function.
 
-    Storage memStorage = MemStorage.newEmptyStorage();
+    Storage memStorage = DbUtil.createStorage();
 
     Injector injector = getInjector(memStorage);
     scheduler = injector.getInstance(TaskScheduler.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 9035484..f17c434 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -62,11 +62,11 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskID;
@@ -130,7 +130,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
   @Before
   public void setUp() {
-    storage = MemStorage.newEmptyStorage();
+    storage = DbUtil.createStorage();
     maintenance = createMock(MaintenanceController.class);
     stateManager = createMock(StateManager.class);
     assigner = createMock(TaskAssigner.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index 91cf5ed..831803f 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -27,10 +27,10 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -54,7 +54,7 @@ public class AuroraCronJobTest extends EasyMockTest {
 
   @Before
   public void setUp() {
-    storage = MemStorage.newEmptyStorage();
+    storage = DbUtil.createStorage();
     stateManager = createMock(StateManager.class);
     backoffHelper = createMock(BackoffHelper.class);
 
@@ -94,11 +94,11 @@ public class AuroraCronJobTest extends EasyMockTest {
     control.replay();
     populateStorage(CronCollisionPolicy.CANCEL_NEW);
     auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-    storage = MemStorage.newEmptyStorage();
+    storage = DbUtil.createStorage();
 
     populateStorage(CronCollisionPolicy.KILL_EXISTING);
     auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-    storage = MemStorage.newEmptyStorage();
+    storage = DbUtil.createStorage();
 
     populateStorage(CronCollisionPolicy.RUN_OVERLAP);
     auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index d4f8b5b..863e9c9 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -33,9 +33,9 @@ import org.apache.aurora.scheduler.cron.CrontabEntry;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
@@ -83,7 +83,7 @@ public class CronIT extends EasyMockTest {
   @Before
   public void setUp() throws Exception {
     stateManager = createMock(StateManager.class);
-    storage = MemStorage.newEmptyStorage();
+    storage = DbUtil.createStorage();
     auroraCronJob = createMock(AuroraCronJob.class);
 
     injector = Guice.createInjector(

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
index d313326..abb915d 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
@@ -29,9 +29,9 @@ import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CrontabEntry;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,7 +59,7 @@ public class CronJobManagerImplTest extends EasyMockTest {
 
   @Before
   public void setUp() {
-    storage = MemStorage.newEmptyStorage();
+    storage = DbUtil.createStorage();
     scheduler = createMock(Scheduler.class);
 
     cronJobManager = new CronJobManagerImpl(storage, scheduler, TimeZone.getTimeZone("GMT"));

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index 2f14205..9c9cf1b 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -30,10 +30,10 @@ import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
@@ -71,7 +71,7 @@ public class LockManagerImplTest extends EasyMockTest {
     tokenGenerator = createMock(UUIDGenerator.class);
     expect(tokenGenerator.createNew()).andReturn(TOKEN).anyTimes();
 
-    lockManager = new LockManagerImpl(MemStorage.newEmptyStorage(), clock, tokenGenerator);
+    lockManager = new LockManagerImpl(DbUtil.createStorage(), clock, tokenGenerator);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index c7fd3e5..afb7db8 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -47,10 +47,10 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.mesos.Protos.SlaveID;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -103,7 +103,7 @@ public class StateManagerImplTest extends EasyMockTest {
     eventSink = createMock(EventSink.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
     // TODO(William Farner): Use a mocked storage.
-    storage = MemStorage.newEmptyStorage();
+    storage = DbUtil.createStorage();
     stateManager = new StateManagerImpl(
         clock,
         driver,

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
index 82e9c76..7b12d64 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
@@ -33,11 +33,11 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -72,7 +72,7 @@ public class ResourceCounterTest {
 
   @Before
   public void setUp() throws Exception {
-    storage = MemStorage.newEmptyStorage();
+    storage = DbUtil.createStorage();
     resourceCounter = new ResourceCounter(storage);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
index 254b231..5ad0de7 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
@@ -27,9 +27,9 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -48,7 +48,7 @@ public class StorageBackfillTest {
 
   @Before
   public void setUp() {
-    storage = MemStorage.newEmptyStorage();
+    storage = DbUtil.createStorage();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
index 743f5ba..b24fef9 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage.db;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.LockStore;
 import org.apache.aurora.scheduler.storage.QuotaStore;
@@ -25,6 +26,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.ibatis.exceptions.PersistenceException;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
@@ -56,6 +58,8 @@ public class DbStorageTest extends EasyMockTest {
     storage = new DbStorage(
         sessionFactory,
         enumMapper,
+        createMock(CronJobStore.Mutable.class),
+        createMock(TaskStore.Mutable.class),
         createMock(SchedulerStore.Mutable.class),
         createMock(AttributeStore.Mutable.class),
         createMock(LockStore.Mutable.class),

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
deleted file mode 100644
index 1eaf3fe..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.twitter.common.inject.Bindings;
-
-import org.apache.aurora.scheduler.storage.Storage;
-
-final class DbUtil {
-
-  private DbUtil() {
-    // Utility class.
-  }
-
-  static Storage createStorage() {
-    Injector injector = Guice.createInjector(DbModule.testModule(Bindings.KeyFactory.PLAIN));
-    Storage storage = injector.getInstance(Storage.class);
-    storage.prepare();
-    return storage;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
deleted file mode 100644
index 30e2328..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.testing.TearDown;
-import com.google.common.testing.junit4.TearDownTestCase;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * TODO(William Farner): Wire a mechanism to allow verification of synchronized writers.
- */
-public class MemStorageTest extends TearDownTestCase {
-
-  private ExecutorService executor;
-  private Storage storage;
-
-  @Before
-  public void setUp() {
-    executor = Executors.newCachedThreadPool(
-        new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build());
-    addTearDown(new TearDown() {
-      @Override
-      public void tearDown() {
-        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
-      }
-    });
-    storage = MemStorage.newEmptyStorage();
-  }
-
-  @Test
-  public void testConcurrentReaders() throws Exception {
-    // Validate that a slow read does not block another read.
-
-    final CountDownLatch slowReadStarted = new CountDownLatch(1);
-    final CountDownLatch slowReadFinished = new CountDownLatch(1);
-
-    Future<String> future = executor.submit(new Callable<String>() {
-      @Override
-      public String call() throws Exception {
-        return storage.read(new Work.Quiet<String>() {
-          @Override
-          public String apply(StoreProvider storeProvider) {
-            slowReadStarted.countDown();
-            try {
-              slowReadFinished.await();
-            } catch (InterruptedException e) {
-              fail(e.getMessage());
-            }
-            return "slowResult";
-          }
-        });
-      }
-    });
-
-    slowReadStarted.await();
-
-    String fastResult = storage.read(new Work.Quiet<String>() {
-      @Override
-      public String apply(StoreProvider storeProvider) {
-        return "fastResult";
-      }
-    });
-    assertEquals("fastResult", fastResult);
-    slowReadFinished.countDown();
-    assertEquals("slowResult", future.get());
-  }
-
-  private IScheduledTask makeTask(String taskId) {
-    return IScheduledTask.build(new ScheduledTask().setAssignedTask(
-        new AssignedTask()
-            .setTaskId(taskId)
-            .setTask(new TaskConfig()
-                .setOwner(new Identity().setRole("owner-" + taskId))
-                .setJobName("job-" + taskId)
-                .setEnvironment("env-" + taskId))));
-  }
-
-  private static class CustomException extends RuntimeException {
-  }
-
-  private <T, E extends RuntimeException> void expectWriteFail(MutateWork<T, E> work) {
-    try {
-      storage.write(work);
-      fail("Expected a CustomException.");
-    } catch (CustomException e) {
-      // Expected.
-    }
-  }
-
-  private void expectTasks(final String... taskIds) {
-    storage.read(new Work.Quiet<Void>() {
-      @Override
-      public Void apply(StoreProvider storeProvider) {
-        Query.Builder query = Query.unscoped();
-        Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
-            .transform(Tasks.SCHEDULED_TO_ID)
-            .toSet();
-        assertEquals(ImmutableSet.<String>builder().add(taskIds).build(), ids);
-        return null;
-      }
-    });
-  }
-
-  @Test
-  public void testWritesUnderTransaction() {
-    final IResourceAggregate quota = IResourceAggregate
-            .build(new ResourceAggregate().setDiskMb(100).setNumCpus(2.0).setRamMb(512));
-
-    try {
-      storage.write(new MutateWork.NoResult.Quiet() {
-        @Override
-        protected void execute(MutableStoreProvider storeProvider) {
-          storeProvider.getQuotaStore().saveQuota("a", quota);
-          throw new CustomException();
-        }
-      });
-      fail("Expected CustomException to be thrown.");
-    } catch (CustomException e) {
-      // Expected
-    }
-
-    storage.read(new Work.Quiet<Void>() {
-      @Override
-      public Void apply(StoreProvider storeProvider) {
-        // If the previous write was under a transaction then there would be no quota records.
-        assertEquals(ImmutableMap.<String, IResourceAggregate>of(),
-            storeProvider.getQuotaStore().fetchQuotas());
-        return null;
-      }
-    });
-  }
-
-  @Test
-  public void testOperations() {
-    expectWriteFail(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
-        throw new CustomException();
-      }
-    });
-    expectTasks("a", "b");
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
-      }
-    });
-    expectTasks("a", "b");
-
-    expectWriteFail(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().deleteAllTasks();
-        throw new CustomException();
-      }
-    });
-    expectTasks();
-
-    expectWriteFail(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a")));
-        throw new CustomException();
-      }
-    });
-    expectTasks("a");
-    storage.read(new Work.Quiet<Void>() {
-      @Override
-      public Void apply(StoreProvider storeProvider) {
-        assertEquals(
-            makeTask("a"),
-            Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(
-                Query.taskScoped("a"))));
-        return null;
-      }
-    });
-
-    // Nested transaction where inner transaction fails.
-    expectWriteFail(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
-        storage.write(new MutateWork.NoResult.Quiet() {
-          @Override
-          protected void execute(MutableStoreProvider storeProvider) {
-            storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
-            throw new CustomException();
-          }
-        });
-      }
-    });
-    expectTasks("a", "c", "d");
-
-    // Nested transaction where outer transaction fails.
-    expectWriteFail(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
-        storage.write(new MutateWork.NoResult.Quiet() {
-          @Override
-          protected void execute(MutableStoreProvider storeProvider) {
-            storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
-          }
-        });
-        throw new CustomException();
-      }
-    });
-    expectTasks("a", "c", "d");
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
index 20c9204..688a02f 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
@@ -25,10 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.twitter.common.inject.Bindings;
-import com.twitter.common.inject.Bindings.KeyFactory;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
@@ -48,7 +44,7 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
-import org.apache.aurora.scheduler.storage.db.DbModule;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -88,11 +84,7 @@ public class MemTaskStoreTest {
 
   @Before
   public void setUp() {
-    Injector injector = Guice.createInjector(
-        new MemStorageModule(KeyFactory.PLAIN),
-        DbModule.testModule(Bindings.annotatedKeyFactory(MemStorage.Delegated.class)));
-    storage = injector.getInstance(Storage.class);
-    storage.prepare();
+    storage = DbUtil.createStorage();
 
     storage.write(new Storage.MutateWork.NoResult.Quiet() {
       @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
new file mode 100644
index 0000000..bad9eb5
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.testing.TearDown;
+import com.google.common.testing.junit4.TearDownTestCase;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * TODO(William Farner): Wire a mechanism to allow verification of synchronized writers.
+ * TODO(wfarner): Merge this with DbStorageTest.
+ */
+public class StorageTransactionTest extends TearDownTestCase {
+
+  private ExecutorService executor;
+  private Storage storage;
+
+  @Before
+  public void setUp() {
+    executor = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build());
+    addTearDown(new TearDown() {
+      @Override
+      public void tearDown() {
+        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
+      }
+    });
+    storage = DbUtil.createStorage();
+  }
+
+  @Test
+  public void testConcurrentReaders() throws Exception {
+    // Validate that a slow read does not block another read.
+
+    final CountDownLatch slowReadStarted = new CountDownLatch(1);
+    final CountDownLatch slowReadFinished = new CountDownLatch(1);
+
+    Future<String> future = executor.submit(new Callable<String>() {
+      @Override
+      public String call() throws Exception {
+        return storage.read(new Work.Quiet<String>() {
+          @Override
+          public String apply(StoreProvider storeProvider) {
+            slowReadStarted.countDown();
+            try {
+              slowReadFinished.await();
+            } catch (InterruptedException e) {
+              fail(e.getMessage());
+            }
+            return "slowResult";
+          }
+        });
+      }
+    });
+
+    slowReadStarted.await();
+
+    String fastResult = storage.read(new Work.Quiet<String>() {
+      @Override
+      public String apply(StoreProvider storeProvider) {
+        return "fastResult";
+      }
+    });
+    assertEquals("fastResult", fastResult);
+    slowReadFinished.countDown();
+    assertEquals("slowResult", future.get());
+  }
+
+  private IScheduledTask makeTask(String taskId) {
+    return IScheduledTask.build(new ScheduledTask().setAssignedTask(
+        new AssignedTask()
+            .setTaskId(taskId)
+            .setTask(new TaskConfig()
+                .setOwner(new Identity().setRole("owner-" + taskId))
+                .setJobName("job-" + taskId)
+                .setEnvironment("env-" + taskId))));
+  }
+
+  private static class CustomException extends RuntimeException {
+  }
+
+  private <T, E extends RuntimeException> void expectWriteFail(MutateWork<T, E> work) {
+    try {
+      storage.write(work);
+      fail("Expected a CustomException.");
+    } catch (CustomException e) {
+      // Expected.
+    }
+  }
+
+  private void expectTasks(final String... taskIds) {
+    storage.read(new Work.Quiet<Void>() {
+      @Override
+      public Void apply(StoreProvider storeProvider) {
+        Query.Builder query = Query.unscoped();
+        Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
+            .transform(Tasks.SCHEDULED_TO_ID)
+            .toSet();
+        assertEquals(ImmutableSet.<String>builder().add(taskIds).build(), ids);
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testWritesUnderTransaction() {
+    final IResourceAggregate quota = IResourceAggregate
+            .build(new ResourceAggregate().setDiskMb(100).setNumCpus(2.0).setRamMb(512));
+
+    try {
+      storage.write(new MutateWork.NoResult.Quiet() {
+        @Override
+        protected void execute(MutableStoreProvider storeProvider) {
+          storeProvider.getQuotaStore().saveQuota("a", quota);
+          throw new CustomException();
+        }
+      });
+      fail("Expected CustomException to be thrown.");
+    } catch (CustomException e) {
+      // Expected
+    }
+
+    storage.read(new Work.Quiet<Void>() {
+      @Override
+      public Void apply(StoreProvider storeProvider) {
+        // If the previous write was under a transaction then there would be no quota records.
+        assertEquals(ImmutableMap.<String, IResourceAggregate>of(),
+            storeProvider.getQuotaStore().fetchQuotas());
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testOperations() {
+    expectWriteFail(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
+        throw new CustomException();
+      }
+    });
+    expectTasks("a", "b");
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
+      }
+    });
+    expectTasks("a", "b");
+
+    expectWriteFail(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().deleteAllTasks();
+        throw new CustomException();
+      }
+    });
+    expectTasks();
+
+    expectWriteFail(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a")));
+        throw new CustomException();
+      }
+    });
+    expectTasks("a");
+    storage.read(new Work.Quiet<Void>() {
+      @Override
+      public Void apply(StoreProvider storeProvider) {
+        assertEquals(
+            makeTask("a"),
+            Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(
+                Query.taskScoped("a"))));
+        return null;
+      }
+    });
+
+    // Nested transaction where inner transaction fails.
+    expectWriteFail(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
+        storage.write(new MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(MutableStoreProvider storeProvider) {
+            storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
+            throw new CustomException();
+          }
+        });
+      }
+    });
+    expectTasks("a", "c", "d");
+
+    // Nested transaction where outer transaction fails.
+    expectWriteFail(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
+        storage.write(new MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(MutableStoreProvider storeProvider) {
+            storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
+          }
+        });
+        throw new CustomException();
+      }
+    });
+    expectTasks("a", "c", "d");
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 802c090..010e75f 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -33,7 +33,6 @@ import com.google.common.eventbus.EventBus;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import com.twitter.common.inject.Bindings;
 import com.twitter.common.inject.Bindings.KeyFactory;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
@@ -92,8 +91,6 @@ import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage.Delegated;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData;
@@ -174,8 +171,7 @@ public class JobUpdaterIT extends EasyMockTest {
 
     Injector injector = Guice.createInjector(
         new UpdaterModule(executor),
-        DbModule.testModule(Bindings.annotatedKeyFactory(Delegated.class)),
-        new MemStorageModule(KeyFactory.PLAIN),
+        DbModule.testModule(KeyFactory.PLAIN),
         new AbstractModule() {
           @Override
           protected void configure() {