You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/03/08 19:11:06 UTC

aurora git commit: Implementing db snapshotting

Repository: aurora
Updated Branches:
  refs/heads/master a91a759d0 -> 26efe5517


Implementing db snapshotting

Bugs closed: AURORA-1627

Reviewed at https://reviews.apache.org/r/44471/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/26efe551
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/26efe551
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/26efe551

Branch: refs/heads/master
Commit: 26efe5517fc0cb471101fdcb072e5dbf5d20bc56
Parents: a91a759
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Tue Mar 8 10:10:45 2016 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Tue Mar 8 10:10:45 2016 -0800

----------------------------------------------------------------------
 NEWS                                            |   2 +
 .../thrift/org/apache/aurora/gen/storage.thrift |   3 +
 config/checkstyle/checkstyle.xml                |   3 -
 .../org/apache/aurora/benchmark/JobUpdates.java |  50 +++-
 .../aurora/benchmark/SnapshotBenchmarks.java    |  69 ++---
 .../aurora/benchmark/UpdateStoreBenchmarks.java |  55 +---
 .../aurora/scheduler/storage/Storage.java       |  13 +
 .../storage/backup/TemporaryStorage.java        |   5 +-
 .../aurora/scheduler/storage/db/DbModule.java   |   4 +-
 .../aurora/scheduler/storage/db/DbStorage.java  |   6 +
 .../scheduler/storage/log/LogStorageModule.java |   6 +
 .../storage/log/SnapshotStoreImpl.java          | 175 +++++++++++--
 .../storage/log/WriteAheadStorage.java          |   6 +
 .../scheduler/storage/backup/RecoveryTest.java  |  12 +-
 .../storage/log/SnapshotStoreImplIT.java        | 262 +++++++++++++++++++
 .../storage/log/SnapshotStoreImplTest.java      | 196 --------------
 .../sh/org/apache/aurora/e2e/test_end_to_end.sh |   1 +
 17 files changed, 549 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/NEWS
----------------------------------------------------------------------
diff --git a/NEWS b/NEWS
index b84a945..7905451 100644
--- a/NEWS
+++ b/NEWS
@@ -14,6 +14,8 @@ Deprecations and removals:
   - `TaskConfig.jobName`
   - `TaskQuery.owner`
 - Task ID strings are no longer prefixed by a timestamp.
+- Scheduler H2 in-memory database is now using MVStore: http://www.h2database.com/html/mvstore.html.
+  In addition, scheduler thrift snapshots are now supporting full DB dumps for faster restarts.
 
 0.12.0
 ------

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
index 6dc4614..9e4213f 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -149,6 +149,9 @@ struct Snapshot {
   8: set<QuotaConfiguration> quotaConfigurations
   9: set<api.Lock> locks
   10: set<StoredJobUpdateDetails> jobUpdateDetails
+  11: list<string> dbScript
+  // Indicates if experimental DB store for tasks and cron jobs was enabled when snapshot was cut.
+  12: bool experimentalTaskStore
 }
 
 // A message header that calls out the number of expected FrameChunks to follow to form a complete

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/config/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
index 2074beb..abc0760 100644
--- a/config/checkstyle/checkstyle.xml
+++ b/config/checkstyle/checkstyle.xml
@@ -234,9 +234,6 @@ limitations under the License.
     <module name="NestedForDepth">
       <property name="max" value="2"/>
     </module>
-    <module name="NestedTryDepth">
-      <property name="max" value="1"/>
-    </module>
     <module name="NoClone"/>
     <module name="NoFinalizer"/>
     <module name="SuperClone"/>

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
index 50044e1..f4f8d00 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
@@ -17,6 +17,7 @@ import java.util.Arrays;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
@@ -32,19 +33,66 @@ import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Lock;
+import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.ILock;
 
 /**
  * Job update factory.
  */
 final class JobUpdates {
+  private JobUpdates() {
+    // Utility class.
+  }
+
+  /**
+   * Saves job updates into provided storage.
+   *
+   * @param storage {@link Storage} instance.
+   * @param updates updates to save.
+   * @return update keys.
+   */
+  static Set<IJobUpdateKey> saveUpdates(Storage storage, Iterable<IJobUpdateDetails> updates) {
+    ImmutableSet.Builder<IJobUpdateKey> keyBuilder = ImmutableSet.builder();
+    storage.write((Storage.MutateWork.NoResult.Quiet) store -> {
+      JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
+      updateStore.deleteAllUpdatesAndEvents();
+      for (IJobUpdateDetails details : updates) {
+        IJobUpdateKey key = details.getUpdate().getSummary().getKey();
+        keyBuilder.add(key);
+        String lockToken = UUID.randomUUID().toString();
+        store.getLockStore().saveLock(ILock.build(new Lock()
+            .setKey(LockKey.job(key.getJob().newBuilder()))
+            .setToken(lockToken)
+            .setUser(Builder.USER)
+            .setTimestampMs(0L)));
+
+        updateStore.saveJobUpdate(details.getUpdate(), Optional.of(lockToken));
+
+        for (IJobUpdateEvent updateEvent : details.getUpdateEvents()) {
+          updateStore.saveJobUpdateEvent(key, updateEvent);
+        }
+
+        for (IJobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) {
+          updateStore.saveJobInstanceUpdateEvent(key, instanceEvent);
+        }
+      }
+    });
+    return keyBuilder.build();
+  }
 
   static final class Builder {
-    private static final String USER = "user";
+    static final String USER = "user";
     private int numEvents = 1;
     private int numInstanceEvents = 5000;
     private int numInstanceOverrides = 1;

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
index ca484fa..2c56b2e 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
@@ -13,32 +13,26 @@
  */
 package org.apache.aurora.benchmark;
 
-import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import javax.inject.Singleton;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
 import org.apache.aurora.common.inject.Bindings;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.db.DbModule;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.ExperimentalTaskStore;
 import org.apache.thrift.TException;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -68,6 +62,7 @@ public class SnapshotBenchmarks {
   public static class RestoreSnapshotWithUpdatesBenchmark {
     private SnapshotStoreImpl snapshotStore;
     private Snapshot snapshot;
+    private Storage storage;
 
     @Param({"1", "5", "10"})
     private int updateCount;
@@ -88,44 +83,34 @@ public class SnapshotBenchmarks {
       // Return non-guessable result to satisfy "blackhole" requirement.
       return System.currentTimeMillis() % 5 == 0;
     }
-  }
-
-  private static SnapshotStoreImpl getSnapshotStore() {
-    Bindings.KeyFactory keyFactory = Bindings.annotatedKeyFactory(Storage.Volatile.class);
-    Injector injector = Guice.createInjector(
-        new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
-            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
-            bind(SnapshotStoreImpl.class).in(Singleton.class);
-          }
-        },
-        DbModule.testModule(keyFactory, Optional.of(new DbModule.TaskStoreModule(keyFactory))));
 
-    Storage storage = injector.getInstance(Key.get(Storage.class, Storage.Volatile.class));
-    storage.prepare();
-    return injector.getInstance(SnapshotStoreImpl.class);
-  }
+    private SnapshotStoreImpl getSnapshotStore() {
+      Bindings.KeyFactory keyFactory = Bindings.annotatedKeyFactory(Storage.Volatile.class);
+      Injector injector = Guice.createInjector(
+          new AbstractModule() {
+            @Override
+            protected void configure() {
+              bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
+              bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+              bind(SnapshotStoreImpl.class).in(Singleton.class);
+              bind(new TypeLiteral<Boolean>() { }).annotatedWith(ExperimentalTaskStore.class)
+                  .toInstance(true);
+            }
+          },
+          DbModule.testModule(keyFactory, Optional.of(new DbModule.TaskStoreModule(keyFactory))));
 
-  private static Snapshot createSnapshot(int updates, int events, int instanceEvents) {
-    Set<IJobUpdateDetails> updateDetails = new JobUpdates.Builder()
-        .setNumEvents(events)
-        .setNumInstanceEvents(instanceEvents)
-        .build(updates);
+      storage = injector.getInstance(Key.get(Storage.class, Storage.Volatile.class));
+      storage.prepare();
+      return injector.getInstance(SnapshotStoreImpl.class);
+    }
 
-    ImmutableSet.Builder<Lock> lockBuilder = ImmutableSet.builder();
-    ImmutableSet.Builder<StoredJobUpdateDetails> detailsBuilder = ImmutableSet.builder();
-    for (IJobUpdateDetails details : updateDetails) {
-      IJobUpdateKey key = details.getUpdate().getSummary().getKey();
-      String lockToken = UUID.randomUUID().toString();
+    private Snapshot createSnapshot(int updates, int events, int instanceEvents) {
+      JobUpdates.saveUpdates(storage, new JobUpdates.Builder()
+          .setNumEvents(events)
+          .setNumInstanceEvents(instanceEvents)
+          .build(updates));
 
-      lockBuilder.add(new Lock(LockKey.job(key.getJob().newBuilder()), lockToken, "user", 0L));
-      detailsBuilder.add(new StoredJobUpdateDetails(details.newBuilder(), lockToken));
+      return snapshotStore.createSnapshot();
     }
-
-    return new Snapshot()
-        .setLocks(lockBuilder.build())
-        .setJobUpdateDetails(detailsBuilder.build());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
index 92849d9..e5228ae 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
@@ -14,24 +14,15 @@
 package org.apache.aurora.benchmark;
 
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.thrift.TException;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -48,8 +39,6 @@ import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Warmup;
 
 public class UpdateStoreBenchmarks {
-  private static final String USER = "user";
-
   @BenchmarkMode(Mode.Throughput)
   @OutputTimeUnit(TimeUnit.SECONDS)
   @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@@ -70,12 +59,9 @@ public class UpdateStoreBenchmarks {
 
     @Setup(Level.Iteration)
     public void setUpIteration() {
-      storage.write((NoResult.Quiet) storeProvider -> {
-        Set<IJobUpdateDetails> updates =
-            new JobUpdates.Builder().setNumInstanceEvents(instances).build(1);
-
-        keys = saveToStore(updates, storeProvider);
-      });
+      keys = JobUpdates.saveUpdates(
+          storage,
+          new JobUpdates.Builder().setNumInstanceEvents(instances).build(1));
     }
 
     @TearDown(Level.Iteration)
@@ -113,12 +99,9 @@ public class UpdateStoreBenchmarks {
 
     @Setup(Level.Iteration)
     public void setUpIteration() {
-      storage.write((NoResult.Quiet) storeProvider -> {
-        Set<IJobUpdateDetails> updates =
-            new JobUpdates.Builder().setNumInstanceOverrides(instanceOverrides).build(1);
-
-        keys = saveToStore(updates, storeProvider);
-      });
+      keys = JobUpdates.saveUpdates(
+          storage,
+          new JobUpdates.Builder().setNumInstanceOverrides(instanceOverrides).build(1));
     }
 
     @TearDown(Level.Iteration)
@@ -135,30 +118,4 @@ public class UpdateStoreBenchmarks {
           Iterables.getOnlyElement(keys)).get());
     }
   }
-
-  private static Set<IJobUpdateKey> saveToStore(
-      Set<IJobUpdateDetails> updates,
-      Storage.MutableStoreProvider storeProvider) {
-
-    JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
-    ImmutableSet.Builder<IJobUpdateKey> keyBuilder = ImmutableSet.builder();
-    for (IJobUpdateDetails details : updates) {
-      IJobUpdateKey key = details.getUpdate().getSummary().getKey();
-      keyBuilder.add(key);
-      String lockToken = UUID.randomUUID().toString();
-      storeProvider.getLockStore().saveLock(
-          ILock.build(new Lock(LockKey.job(key.getJob().newBuilder()), lockToken, USER, 0L)));
-
-      updateStore.saveJobUpdate(details.getUpdate(), Optional.of(lockToken));
-
-      for (IJobUpdateEvent updateEvent : details.getUpdateEvents()) {
-        updateStore.saveJobUpdateEvent(key, updateEvent);
-      }
-
-      for (IJobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) {
-        updateStore.saveJobInstanceUpdateEvent(key, instanceEvent);
-      }
-    }
-    return keyBuilder.build();
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 5124d17..859c964 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -65,6 +65,19 @@ public interface Storage {
     QuotaStore.Mutable getQuotaStore();
     AttributeStore.Mutable getAttributeStore();
     JobUpdateStore.Mutable getJobUpdateStore();
+
+    /**
+     * Gets direct low level access to the underlying storage.
+     * <p>
+     * This grants a potentially dangerous direct access to the underlying storage and should
+     * only be used during storage initialization when unstructured bulk data manipulations
+     * are required.
+     * </p>
+     *
+     * @param <T> Direct access type.
+     * @return Direct read/write accessor to the storage.
+     */
+    <T> T getUnsafeStoreAccess();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 46b3d10..5c7d92f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -73,7 +73,10 @@ interface TemporaryStorage {
       final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl(
           buildInfo,
           clock,
-          storage);
+          storage,
+          // Safe to pass false here to default to the non-experimental task store
+          // during restore from backup procedure.
+          false /** useDbSnapshotForTaskStore */);
       snapshotStore.applySnapshot(snapshot);
 
       return new TemporaryStorage() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
index 6d8fa11..ff663fa 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
@@ -68,7 +68,7 @@ public final class DbModule extends PrivateModule {
 
   @CmdLine(name = "use_beta_db_task_store",
       help = "Whether to use the experimental database-backed task store.")
-  private static final Arg<Boolean> USE_DB_TASK_STORE = Arg.create(false);
+  public static final Arg<Boolean> USE_DB_TASK_STORE = Arg.create(false);
 
   @CmdLine(name = "slow_query_log_threshold",
       help = "Log all queries that take at least this long to execute.")
@@ -115,8 +115,6 @@ public final class DbModule extends PrivateModule {
 
     Map<String, String> args = ImmutableMap.<String, String>builder()
         .putAll(jdbcUriArgs)
-        // We always disable the MvStore, as it is in beta as of this writing.
-        .put("MV_STORE", "false")
         // READ COMMITTED transaction isolation.  More details here
         // http://www.h2database.com/html/advanced.html?#transaction_isolation
         .put("LOCK_MODE", "3")

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index cca92dd..360914e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -134,6 +134,12 @@ class DbStorage extends AbstractIdleService implements Storage {
       public JobUpdateStore.Mutable getJobUpdateStore() {
         return jobUpdateStore;
       }
+
+      @Override
+      @SuppressWarnings("unchecked")
+      public <T> T getUnsafeStoreAccess() {
+        return (T) sessionFactory.getConfiguration().getEnvironment().getDataSource();
+      }
     };
     this.statsProvider = requireNonNull(statsProvider);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index ed63a74..7dcd1bf 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -31,8 +31,10 @@ import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.db.DbModule;
 import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
 import org.apache.aurora.scheduler.storage.log.LogStorage.Settings;
+import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.ExperimentalTaskStore;
 
 import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
 import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
@@ -67,6 +69,9 @@ public class LogStorageModule extends PrivateModule {
     bind(Settings.class)
         .toInstance(new Settings(SHUTDOWN_GRACE_PERIOD.get(), SNAPSHOT_INTERVAL.get()));
 
+    bind(new TypeLiteral<Boolean>() { }).annotatedWith(ExperimentalTaskStore.class)
+        .toInstance(DbModule.USE_DB_TASK_STORE.get());
+
     bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
         .toInstance(MAX_LOG_ENTRY_SIZE.get());
     bind(LogManager.class).in(Singleton.class);
@@ -77,6 +82,7 @@ public class LogStorageModule extends PrivateModule {
     expose(Storage.class);
     expose(NonVolatileStorage.class);
     expose(DistributedSnapshotStore.class);
+    expose(new TypeLiteral<Boolean>() { }).annotatedWith(ExperimentalTaskStore.class);
 
     bind(EntrySerializer.class).to(EntrySerializerImpl.class);
     // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index db90150..6fee251 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -13,13 +13,28 @@
  */
 package org.apache.aurora.scheduler.storage.log;
 
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 import javax.inject.Inject;
+import javax.inject.Qualifier;
+import javax.sql.DataSource;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.util.BuildInfo;
@@ -40,7 +55,6 @@ import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Volatile;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -64,17 +78,94 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
 
   private static final Logger LOG = LoggerFactory.getLogger(SnapshotStoreImpl.class);
 
-  private static final Iterable<SnapshotField> SNAPSHOT_FIELDS = Arrays.asList(
+  /**
+   * Number of rows to run in a single batch during dbsnapshot restore.
+   */
+  private static final int DB_BATCH_SIZE = 20;
+
+  private static boolean hasDbSnapshot(Snapshot snapshot) {
+    return snapshot.isSetDbScript();
+  }
+
+  private boolean hasDbTaskStore(Snapshot snapshot) {
+    return useDbSnapshotForTaskStore
+        && hasDbSnapshot(snapshot)
+        && snapshot.isExperimentalTaskStore();
+  }
+
+  private final Iterable<SnapshotField> snapshotFields = Arrays.asList(
+      // Order is critical here. The DB snapshot should always be tried first to ensure
+      // graceful migration to DBTaskStore. Otherwise, there is a direct risk of losing the cluster.
+      // The following scenario illustrates how that can happen:
+      // - Dbsnapshot:ON, DBTaskStore:OFF
+      // - Scheduler is updated with DBTaskStore:ON, restarts and populates all tasks from snapshot
+      // - Should the dbsnapshot get applied last, all tables would be dropped and recreated BUT
+      //   since there was no task data stored in dbsnapshot (DBTaskStore was OFF last time
+      //   snapshot was cut), all tasks would be erased
+      // - If the above is not detected before a new snapshot is cut all tasks will be dropped the
+      //   moment a new snapshot is created
+      new SnapshotField() {
+        @Override
+        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          LOG.info("Saving dbsnapshot");
+          // Note: we don't use mybatis mapped statements for performance reasons and to avoid
+          // mapping/unmapping hassle as snapshot commands should never be used upstream.
+          try (Connection c = ((DataSource) store.getUnsafeStoreAccess()).getConnection()) {
+            try (PreparedStatement ps = c.prepareStatement("SCRIPT")) {
+              try (ResultSet rs = ps.executeQuery()) {
+                ImmutableList.Builder<String> builder = ImmutableList.builder();
+                while (rs.next()) {
+                  String columnValue = rs.getString("SCRIPT");
+                  builder.add(columnValue + "\n");
+                }
+                snapshot.setDbScript(builder.build());
+              }
+            }
+          } catch (SQLException e) {
+            Throwables.propagate(e);
+          }
+        }
+
+        @Override
+        public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+
+          if (snapshot.isSetDbScript()) {
+            try (Connection c = ((DataSource) store.getUnsafeStoreAccess()).getConnection()) {
+              LOG.info("Dropping all tables");
+              try (PreparedStatement drop = c.prepareStatement("DROP ALL OBJECTS")) {
+                drop.executeUpdate();
+              }
+
+              LOG.info("Restoring dbsnapshot. Row count: " + snapshot.getDbScript().size());
+              // Partition the restore script into manageable size batches to avoid possible OOM
+              // due to large size DML statement.
+              List<List<String>> batches = Lists.partition(snapshot.getDbScript(), DB_BATCH_SIZE);
+              for (List<String> batch : batches) {
+                try (PreparedStatement restore = c.prepareStatement(Joiner.on("").join(batch))) {
+                  restore.executeUpdate();
+                }
+              }
+            } catch (SQLException e) {
+              Throwables.propagate(e);
+            }
+          }
+        }
+      },
       new SnapshotField() {
         // It's important for locks to be replayed first, since there are relations that expect
         // references to be valid on insertion.
         @Override
-        public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
         }
 
         @Override
         public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          if (hasDbSnapshot(snapshot)) {
+            LOG.info("Deferring lock restore to dbsnapshot");
+            return;
+          }
+
           store.getLockStore().deleteLocks();
 
           if (snapshot.isSetLocks()) {
@@ -86,38 +177,42 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
       },
       new SnapshotField() {
         @Override
-        public void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot) {
+        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           snapshot.setHostAttributes(
-              IHostAttributes.toBuildersSet(storeProvider.getAttributeStore().getHostAttributes()));
+              IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes()));
         }
 
         @Override
         public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          if (hasDbSnapshot(snapshot)) {
+            LOG.info("Deferring attribute restore to dbsnapshot");
+            return;
+          }
+
           store.getAttributeStore().deleteHostAttributes();
 
           if (snapshot.isSetHostAttributes()) {
             for (HostAttributes attributes : snapshot.getHostAttributes()) {
-              // Prior to commit 5cf760b, the store would persist maintenance mode changes for
-              // unknown hosts.  5cf760b began rejecting these, but the replicated log may still
-              // contain entries with a null slave ID.
-              if (attributes.isSetSlaveId()) {
-                store.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes));
-              } else {
-                LOG.info("Dropping host attributes with no slave ID: " + attributes);
-              }
+              store.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes));
             }
           }
         }
       },
       new SnapshotField() {
         @Override
-        public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           snapshot.setTasks(
               IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
+          snapshot.setExperimentalTaskStore(useDbSnapshotForTaskStore);
         }
 
         @Override
         public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          if (hasDbTaskStore(snapshot)) {
+            LOG.info("Deferring task restore to dbsnapshot");
+            return;
+          }
+
           store.getUnsafeTaskStore().deleteAllTasks();
 
           if (snapshot.isSetTasks()) {
@@ -128,17 +223,23 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
       },
       new SnapshotField() {
         @Override
-        public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder();
 
           for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) {
             jobs.add(new StoredCronJob(config.newBuilder()));
           }
           snapshot.setCronJobs(jobs.build());
+          snapshot.setExperimentalTaskStore(useDbSnapshotForTaskStore);
         }
 
         @Override
         public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          if (hasDbTaskStore(snapshot)) {
+            LOG.info("Deferring cron job restore to dbsnapshot");
+            return;
+          }
+
           store.getCronJobStore().deleteJobs();
 
           if (snapshot.isSetCronJobs()) {
@@ -151,12 +252,17 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
       },
       new SnapshotField() {
         @Override
-        public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           // SchedulerMetadata is updated outside of the static list of SnapshotFields
         }
 
         @Override
         public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          if (hasDbSnapshot(snapshot)) {
+            LOG.info("Deferring metadata restore to dbsnapshot");
+            return;
+          }
+
           if (snapshot.isSetSchedulerMetadata()
               && snapshot.getSchedulerMetadata().isSetFrameworkId()) {
             // No delete necessary here since this is a single value.
@@ -168,7 +274,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
       },
       new SnapshotField() {
         @Override
-        public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
           for (Map.Entry<String, IResourceAggregate> entry
               : store.getQuotaStore().fetchQuotas().entrySet()) {
@@ -181,6 +287,11 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
 
         @Override
         public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          if (hasDbSnapshot(snapshot)) {
+            LOG.info("Deferring quota restore to dbsnapshot");
+            return;
+          }
+
           store.getQuotaStore().deleteQuotas();
 
           if (snapshot.isSetQuotaConfigurations()) {
@@ -193,12 +304,17 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
       },
       new SnapshotField() {
         @Override
-        public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           snapshot.setJobUpdateDetails(store.getJobUpdateStore().fetchAllJobUpdateDetails());
         }
 
         @Override
         public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          if (hasDbSnapshot(snapshot)) {
+            LOG.info("Deferring job update restore to dbsnapshot");
+            return;
+          }
+
           JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
           updateStore.deleteAllUpdatesAndEvents();
 
@@ -233,12 +349,27 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
   private final BuildInfo buildInfo;
   private final Clock clock;
   private final Storage storage;
+  private final boolean useDbSnapshotForTaskStore;
+
+  /**
+   * Identifies if experimental task store is in use.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @Qualifier
+  public @interface ExperimentalTaskStore { }
 
   @Inject
-  public SnapshotStoreImpl(BuildInfo buildInfo, Clock clock, @Volatile Storage storage) {
+  public SnapshotStoreImpl(
+      BuildInfo buildInfo,
+      Clock clock,
+      @Volatile Storage storage,
+      @ExperimentalTaskStore boolean useDbSnapshotForTaskStore) {
+
     this.buildInfo = requireNonNull(buildInfo);
     this.clock = requireNonNull(clock);
     this.storage = requireNonNull(storage);
+    this.useDbSnapshotForTaskStore = useDbSnapshotForTaskStore;
   }
 
   @Timed("snapshot_create")
@@ -252,7 +383,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
       // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
       // one of the field closures is mean and tries to apply a timestamp.
       long timestamp = clock.nowMillis();
-      for (SnapshotField field : SNAPSHOT_FIELDS) {
+      for (SnapshotField field : snapshotFields) {
         field.saveToSnapshot(storeProvider, snapshot);
       }
 
@@ -274,14 +405,14 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
     storage.write((NoResult.Quiet) storeProvider -> {
       LOG.info("Restoring snapshot.");
 
-      for (SnapshotField field : SNAPSHOT_FIELDS) {
+      for (SnapshotField field : snapshotFields) {
         field.restoreFromSnapshot(storeProvider, snapshot);
       }
     });
   }
 
   private interface SnapshotField {
-    void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
+    void saveToSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
 
     void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 2f07afb..d0de063 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -381,4 +381,10 @@ class WriteAheadStorage extends WriteAheadStorageForwarder implements
   public JobUpdateStore.Mutable getJobUpdateStore() {
     return this;
   }
+
+  @Override
+  public <T> T getUnsafeStoreAccess() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
index 172dd20..a33f6f7 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
@@ -96,7 +96,8 @@ public class RecoveryTest extends EasyMockTest {
     expect(snapshotStore.createSnapshot()).andReturn(SNAPSHOT1);
     Capture<MutateWork<Object, Exception>> transaction = createCapture();
     expect(primaryStorage.write(capture(transaction))).andReturn(null);
-    distributedStore.persist(SNAPSHOT1);
+    Capture<Snapshot> snapshot = createCapture();
+    distributedStore.persist(capture(snapshot));
     shutDownNow.execute();
 
     control.replay();
@@ -114,6 +115,9 @@ public class RecoveryTest extends EasyMockTest {
         recovery.query(Query.unscoped()));
     recovery.commit();
     transaction.getValue().apply(storeProvider);
+
+    snapshot.getValue().unsetDbScript();
+    assertEquals(SNAPSHOT1, snapshot.getValue());
   }
 
   @Test
@@ -122,7 +126,8 @@ public class RecoveryTest extends EasyMockTest {
     Snapshot modified = SNAPSHOT1.deepCopy().setTasks(ImmutableSet.of(TASK1.newBuilder()));
     Capture<MutateWork<Object, Exception>> transaction = createCapture();
     expect(primaryStorage.write(capture(transaction))).andReturn(null);
-    distributedStore.persist(modified);
+    Capture<Snapshot> snapshot = createCapture();
+    distributedStore.persist(capture(snapshot));
     shutDownNow.execute();
 
     control.replay();
@@ -140,6 +145,9 @@ public class RecoveryTest extends EasyMockTest {
         recovery.query(Query.unscoped()));
     recovery.commit();
     transaction.getValue().apply(storeProvider);
+
+    snapshot.getValue().unsetDbScript();
+    assertEquals(modified, snapshot.getValue());
   }
 
   @Test(expected = RecoveryException.class)

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
new file mode 100644
index 0000000..6a39d89
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.Map;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.util.testing.FakeBuildInfo;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateState;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Lock;
+import org.apache.aurora.gen.LockKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.storage.QuotaConfiguration;
+import org.apache.aurora.gen.storage.SchedulerMetadata;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.gen.storage.StoredCronJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
+import org.apache.aurora.scheduler.ResourceAggregates;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.InMemStoresModule;
+import org.junit.Test;
+
+import static org.apache.aurora.common.inject.Bindings.KeyFactory.PLAIN;
+import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
+import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import static org.apache.aurora.scheduler.storage.db.DbModule.testModule;
+import static org.apache.aurora.scheduler.storage.db.DbUtil.createStorage;
+import static org.apache.aurora.scheduler.storage.db.DbUtil.createStorageInjector;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SnapshotStoreImplIT {
+
+  private static final long NOW = 10335463456L;
+  private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job");
+
+  private Storage storage;
+  private SnapshotStore<Snapshot> snapshotStore;
+
+  private void setUpStore(boolean dbTaskStore) {
+    storage = dbTaskStore
+        ? createStorage()
+        : createStorageInjector(
+        testModule(PLAIN, Optional.of(new InMemStoresModule(PLAIN)))).getInstance(Storage.class);
+
+    FakeClock clock = new FakeClock();
+    clock.setNowMillis(NOW);
+    snapshotStore = new SnapshotStoreImpl(
+        generateBuildInfo(),
+        clock,
+        storage,
+        dbTaskStore);
+  }
+
+  private static Snapshot makeComparable(Snapshot snapshot) {
+    Snapshot copy = snapshot.deepCopy();
+    // Ignore DB snapshot. It will be tested by asserting the DB data.
+    copy.unsetDbScript();
+    copy.setExperimentalTaskStore(false);
+    return copy;
+  }
+
+  @Test
+  public void testNoDBTaskStore() {
+    setUpStore(false);
+    populateStore();
+
+    Snapshot snapshot1 = snapshotStore.createSnapshot();
+    assertEquals(expected(), makeComparable(snapshot1));
+    assertFalse(snapshot1.isExperimentalTaskStore());
+
+    snapshotStore.applySnapshot(snapshot1);
+    Snapshot snapshot2 = snapshotStore.createSnapshot();
+    assertEquals(expected(), makeComparable(snapshot2));
+    assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+  }
+
+  @Test
+  public void testMigrateToDBTaskStore() {
+    setUpStore(false);
+    populateStore();
+
+    Snapshot snapshot1 = snapshotStore.createSnapshot();
+    assertEquals(expected(), makeComparable(snapshot1));
+    assertFalse(snapshot1.isExperimentalTaskStore());
+
+    setUpStore(true);
+    snapshotStore.applySnapshot(snapshot1);
+    Snapshot snapshot2 = snapshotStore.createSnapshot();
+    assertTrue(snapshot2.isExperimentalTaskStore());
+    assertEquals(expected(), makeComparable(snapshot2));
+    assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+  }
+
+  @Test
+  public void testMigrateFromDBTaskStore() {
+    setUpStore(true);
+    populateStore();
+
+    Snapshot snapshot1 = snapshotStore.createSnapshot();
+    assertEquals(expected(), makeComparable(snapshot1));
+    assertTrue(snapshot1.isExperimentalTaskStore());
+
+    setUpStore(false);
+    snapshotStore.applySnapshot(snapshot1);
+    Snapshot snapshot2 = snapshotStore.createSnapshot();
+    assertFalse(snapshot2.isExperimentalTaskStore());
+    assertEquals(expected(), makeComparable(snapshot2));
+    assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+  }
+
+  @Test
+  public void testDBTaskStore() {
+    setUpStore(true);
+    populateStore();
+
+    Snapshot snapshot1 = snapshotStore.createSnapshot();
+    assertEquals(expected(), makeComparable(snapshot1));
+    assertTrue(snapshot1.isExperimentalTaskStore());
+
+    snapshotStore.applySnapshot(snapshot1);
+    Snapshot snapshot2 = snapshotStore.createSnapshot();
+    assertEquals(expected(), makeComparable(snapshot2));
+    assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+  }
+
+  private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", JOB_KEY);
+  private static final ITaskConfig TASK_CONFIG = TaskTestUtil.makeConfig(JOB_KEY);
+  private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(new JobConfiguration()
+      .setKey(new JobKey("owner", "env", "name"))
+      .setOwner(new Identity("user"))
+      .setCronSchedule("* * * * *")
+      .setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING)
+      .setInstanceCount(1)
+      .setTaskConfig(TASK_CONFIG.newBuilder()));
+  private static final String ROLE = "role";
+  private static final IResourceAggregate QUOTA = ResourceAggregates.LARGE;
+  private static final IHostAttributes ATTRIBUTES = IHostAttributes.build(
+      new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value"))))
+          .setMode(MaintenanceMode.NONE)
+          .setSlaveId("slave id"));
+  private static final String FRAMEWORK_ID = "framework_id";
+  private static final Map<String, String> METADATA = ImmutableMap.of(
+          FakeBuildInfo.DATE, FakeBuildInfo.DATE,
+          FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION,
+          FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG);
+  private static final ILock LOCK = ILock.build(new Lock()
+      .setKey(LockKey.job(JobKeys.from("testRole", "testEnv", "testJob").newBuilder()))
+      .setToken("lockId")
+      .setUser("testUser")
+      .setTimestampMs(12345L));
+  private static final IJobUpdateKey UPDATE_ID =
+      IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "updateId1"));
+  private static final IJobUpdateDetails UPDATE = IJobUpdateDetails.build(new JobUpdateDetails()
+      .setUpdate(new JobUpdate()
+          .setInstructions(new JobUpdateInstructions()
+              .setDesiredState(new InstanceTaskConfig()
+                  .setTask(TASK_CONFIG.newBuilder())
+                  .setInstances(ImmutableSet.of(new Range(0, 7))))
+              .setInitialState(ImmutableSet.of(
+                  new InstanceTaskConfig()
+                      .setInstances(ImmutableSet.of(new Range(0, 1)))
+                      .setTask(TASK_CONFIG.newBuilder())))
+              .setSettings(new JobUpdateSettings()
+                  .setBlockIfNoPulsesAfterMs(500)
+                  .setUpdateGroupSize(1)
+                  .setMaxPerInstanceFailures(1)
+                  .setMaxFailedInstances(1)
+                  .setMinWaitInInstanceRunningMs(200)
+                  .setRollbackOnFailure(true)
+                  .setWaitForBatchCompletion(true)
+                  .setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0)))))
+          .setSummary(new JobUpdateSummary()
+              .setState(new JobUpdateState().setStatus(JobUpdateStatus.ERROR))
+              .setUser("user")
+              .setKey(UPDATE_ID.newBuilder())))
+      .setUpdateEvents(ImmutableList.of(new JobUpdateEvent()
+          .setUser("user")
+          .setMessage("message")
+          .setStatus(JobUpdateStatus.ERROR)))
+      .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent()
+          .setAction(JobUpdateAction.INSTANCE_UPDATED))));
+
+  private Snapshot expected() {
+    return new Snapshot()
+        .setTimestamp(NOW)
+        .setTasks(ImmutableSet.of(TASK.newBuilder()))
+        .setQuotaConfigurations(ImmutableSet.of(new QuotaConfiguration(ROLE, QUOTA.newBuilder())))
+        .setHostAttributes(ImmutableSet.of(ATTRIBUTES.newBuilder()))
+        .setCronJobs(ImmutableSet.of(new StoredCronJob(CRON_JOB.newBuilder())))
+        .setSchedulerMetadata(new SchedulerMetadata(FRAMEWORK_ID, METADATA))
+        .setLocks(ImmutableSet.of(LOCK.newBuilder()))
+        .setJobUpdateDetails(ImmutableSet.of(
+            new StoredJobUpdateDetails(UPDATE.newBuilder(), LOCK.getToken())));
+  }
+
+  private void populateStore() {
+    storage.write((NoResult.Quiet) store -> {
+      store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(TASK));
+      store.getCronJobStore().saveAcceptedJob(CRON_JOB);
+      store.getQuotaStore().saveQuota(ROLE, QUOTA);
+      store.getAttributeStore().saveHostAttributes(ATTRIBUTES);
+      store.getSchedulerStore().saveFrameworkId(FRAMEWORK_ID);
+      store.getLockStore().saveLock(LOCK);
+      store.getJobUpdateStore().saveJobUpdate(UPDATE.getUpdate(), Optional.of(LOCK.getToken()));
+      store.getJobUpdateStore().saveJobUpdateEvent(
+          UPDATE.getUpdate().getSummary().getKey(),
+          UPDATE.getUpdateEvents().get(0));
+      store.getJobUpdateStore().saveJobInstanceUpdateEvent(
+          UPDATE.getUpdate().getSummary().getKey(),
+          UPDATE.getInstanceEvents().get(0)
+      );
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
deleted file mode 100644
index 4407867..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.util.testing.FakeBuildInfo;
-import org.apache.aurora.common.util.testing.FakeClock;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateDetails;
-import org.apache.aurora.gen.JobUpdateEvent;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.storage.QuotaConfiguration;
-import org.apache.aurora.gen.storage.SchedulerMetadata;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.StoredCronJob;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
-import org.apache.aurora.scheduler.ResourceAggregates;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class SnapshotStoreImplTest extends EasyMockTest {
-
-  private static final long NOW = 10335463456L;
-  private static final JobKey JOB_KEY = JobKeys.from("role", "env", "job").newBuilder();
-
-  private StorageTestUtil storageUtil;
-  private SnapshotStore<Snapshot> snapshotStore;
-
-  @Before
-  public void setUp() {
-    FakeClock clock = new FakeClock();
-    clock.setNowMillis(NOW);
-    storageUtil = new StorageTestUtil(this);
-    snapshotStore = new SnapshotStoreImpl(
-        generateBuildInfo(),
-        clock,
-        storageUtil.storage);
-  }
-
-  private static IJobUpdateKey makeKey(String id) {
-    return IJobUpdateKey.build(new JobUpdateKey(JOB_KEY, id));
-  }
-
-  @Test
-  public void testCreateAndRestoreNewSnapshot() {
-    ImmutableSet<IScheduledTask> tasks = ImmutableSet.of(
-        IScheduledTask.build(new ScheduledTask().setStatus(ScheduleStatus.PENDING)));
-
-    Set<QuotaConfiguration> quotas =
-        ImmutableSet.of(
-            new QuotaConfiguration("steve", ResourceAggregates.EMPTY.newBuilder()));
-    IHostAttributes attribute = IHostAttributes.build(
-        new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value"))))
-            .setSlaveId("slave id"));
-    // A legacy attribute that has a maintenance mode set, but nothing else.  These should be
-    // dropped.
-    IHostAttributes legacyAttribute = IHostAttributes.build(
-        new HostAttributes("host", ImmutableSet.of()));
-    StoredCronJob job = new StoredCronJob(
-        new JobConfiguration().setKey(new JobKey("owner", "env", "name")));
-    String frameworkId = "framework_id";
-    ILock lock = ILock.build(new Lock()
-        .setKey(LockKey.job(JobKeys.from("testRole", "testEnv", "testJob").newBuilder()))
-        .setToken("lockId")
-        .setUser("testUser")
-        .setTimestampMs(12345L));
-    SchedulerMetadata metadata = new SchedulerMetadata().setFrameworkId(frameworkId);
-    metadata.setDetails(Maps.newHashMap());
-    metadata.getDetails().put(FakeBuildInfo.DATE, FakeBuildInfo.DATE);
-    metadata.getDetails().put(FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION);
-    metadata.getDetails().put(FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG);
-    IJobUpdateKey updateId1 =  makeKey("updateId1");
-    IJobUpdateKey updateId2 = makeKey("updateId2");
-    IJobUpdateDetails updateDetails1 = IJobUpdateDetails.build(new JobUpdateDetails()
-        .setUpdate(new JobUpdate().setSummary(
-            new JobUpdateSummary().setKey(updateId1.newBuilder())))
-        .setUpdateEvents(ImmutableList.of(new JobUpdateEvent().setStatus(JobUpdateStatus.ERROR)))
-        .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent().setTimestampMs(123L))));
-
-    IJobUpdateDetails updateDetails2 = IJobUpdateDetails.build(new JobUpdateDetails()
-        .setUpdate(new JobUpdate().setSummary(
-            new JobUpdateSummary().setKey(updateId2.newBuilder()))));
-
-    storageUtil.expectOperations();
-    expect(storageUtil.taskStore.fetchTasks(Query.unscoped())).andReturn(tasks);
-    expect(storageUtil.quotaStore.fetchQuotas())
-        .andReturn(ImmutableMap.of("steve", ResourceAggregates.EMPTY));
-    expect(storageUtil.attributeStore.getHostAttributes())
-        .andReturn(ImmutableSet.of(attribute, legacyAttribute));
-    expect(storageUtil.jobStore.fetchJobs())
-        .andReturn(ImmutableSet.of(IJobConfiguration.build(job.getJobConfiguration())));
-    expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(frameworkId));
-    expect(storageUtil.lockStore.fetchLocks()).andReturn(ImmutableSet.of(lock));
-    String lockToken = "token";
-    expect(storageUtil.jobUpdateStore.fetchAllJobUpdateDetails())
-        .andReturn(ImmutableSet.of(
-            new StoredJobUpdateDetails(updateDetails1.newBuilder(), lockToken),
-            new StoredJobUpdateDetails(updateDetails2.newBuilder(), null)));
-
-    expectDataWipe();
-    storageUtil.taskStore.saveTasks(tasks);
-    storageUtil.quotaStore.saveQuota("steve", ResourceAggregates.EMPTY);
-    expect(storageUtil.attributeStore.saveHostAttributes(attribute)).andReturn(true);
-    storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(job.getJobConfiguration()));
-    storageUtil.schedulerStore.saveFrameworkId(frameworkId);
-    storageUtil.lockStore.saveLock(lock);
-    storageUtil.jobUpdateStore.saveJobUpdate(
-        updateDetails1.getUpdate(), Optional.fromNullable(lockToken));
-    storageUtil.jobUpdateStore.saveJobUpdateEvent(
-        updateId1,
-        Iterables.getOnlyElement(updateDetails1.getUpdateEvents()));
-    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
-        updateId1,
-        Iterables.getOnlyElement(updateDetails1.getInstanceEvents()));
-
-    // The saved object for update2 should be backfilled with update key.
-    JobUpdate update2Expected = updateDetails2.getUpdate().newBuilder();
-    update2Expected.getSummary().setKey(updateId2.newBuilder());
-    storageUtil.jobUpdateStore.saveJobUpdate(
-        IJobUpdate.build(update2Expected), Optional.absent());
-
-    control.replay();
-
-    Snapshot expected = new Snapshot()
-        .setTimestamp(NOW)
-        .setTasks(IScheduledTask.toBuildersSet(tasks))
-        .setQuotaConfigurations(quotas)
-        .setHostAttributes(ImmutableSet.of(attribute.newBuilder(), legacyAttribute.newBuilder()))
-        .setCronJobs(ImmutableSet.of(job))
-        .setSchedulerMetadata(metadata)
-        .setLocks(ImmutableSet.of(lock.newBuilder()))
-        .setJobUpdateDetails(ImmutableSet.of(
-            new StoredJobUpdateDetails(updateDetails1.newBuilder(), lockToken),
-            new StoredJobUpdateDetails(updateDetails2.newBuilder(), null)));
-
-    Snapshot snapshot = snapshotStore.createSnapshot();
-    assertEquals(expected, snapshot);
-
-    snapshotStore.applySnapshot(expected);
-  }
-
-  private void expectDataWipe() {
-    storageUtil.taskStore.deleteAllTasks();
-    storageUtil.quotaStore.deleteQuotas();
-    storageUtil.attributeStore.deleteHostAttributes();
-    storageUtil.jobStore.deleteJobs();
-    storageUtil.lockStore.deleteLocks();
-    storageUtil.jobUpdateStore.deleteAllUpdatesAndEvents();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index 75130a3..b469f9b 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -150,6 +150,7 @@ test_update() {
   assert_update_state $_jobkey 'ROLLING_FORWARD'
   local _update_id=$(aurora update list $_jobkey --status ROLLING_FORWARD \
       | tail -n +2 | awk '{print $2}')
+  aurora_admin scheduler_snapshot devcluster
   sudo restart aurora-scheduler
   assert_update_state $_jobkey 'ROLLING_FORWARD'
   aurora update pause $_jobkey --message='hello'