You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/14 04:38:02 UTC

[1/2] aurora git commit: Recover snapshots via the Op stream

Repository: aurora
Updated Branches:
  refs/heads/master 4489dc345 -> 5f79f7ca7


http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java
new file mode 100644
index 0000000..b30de88
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import javax.inject.Inject;
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A {@link SnapshotStore} that snapshots to the log, and automatically snapshots on
+ * a fixed interval.
+ */
+class SnapshotService extends AbstractScheduledService implements SnapshotStore {
+  private static final Logger LOG = LoggerFactory.getLogger(SnapshotService.class);
+
+  private final Storage storage;
+  private final LogPersistence log;
+  private final Snapshotter snapshotter;
+  private final Amount<Long, Time> snapshotInterval;
+
+  @Inject
+  SnapshotService(Storage storage, LogPersistence log, Snapshotter snapshotter, Settings settings) {
+    this.storage = requireNonNull(storage);
+    this.log = requireNonNull(log);
+    this.snapshotter = requireNonNull(snapshotter);
+    this.snapshotInterval = settings.getSnapshotInterval();
+  }
+
+  @Override
+  protected void runOneIteration() {
+    snapshot();
+  }
+
+  @Timed("scheduler_log_snapshot")
+  @Override
+  public void snapshot() throws StorageException {
+    try {
+      LOG.info("Creating snapshot");
+
+      // It's important to perform snapshot creation in a write lock to ensure all upstream callers
+      // are correctly synchronized (e.g. during backup creation).
+      storage.write((NoResult.Quiet) stores -> {
+        Snapshot snapshot = snapshotter.from(stores);
+        LOG.info("Saving snapshot");
+        snapshotWith(snapshot);
+
+        LOG.info("Snapshot complete."
+            + " host attrs: " + snapshot.getHostAttributesSize()
+            + ", cron jobs: " + snapshot.getCronJobsSize()
+            + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+            + ", tasks: " + snapshot.getTasksSize()
+            + ", updates: " + snapshot.getJobUpdateDetailsSize());
+      });
+    } catch (CodingException e) {
+      throw new StorageException("Failed to encode a snapshot", e);
+    } catch (InvalidPositionException e) {
+      throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
+    } catch (StreamAccessException e) {
+      throw new StorageException("Failed to create a snapshot", e);
+    }
+  }
+
+  @Timed("scheduler_log_snapshot_persist")
+  @Override
+  public void snapshotWith(Snapshot snapshot)
+      throws CodingException, InvalidPositionException, StreamAccessException {
+
+    log.persist(snapshot);
+  }
+
+  @Override
+  protected Scheduler scheduler() {
+    return Scheduler.newFixedDelaySchedule(
+        snapshotInterval.getValue(),
+        snapshotInterval.getValue(),
+        snapshotInterval.getUnit().getTimeUnit());
+  }
+
+  /**
+   * Configuration settings for log persistence.
+   */
+  public static class Settings {
+    private final Amount<Long, Time> snapshotInterval;
+
+    Settings(Amount<Long, Time> snapshotInterval) {
+      this.snapshotInterval = requireNonNull(snapshotInterval);
+    }
+
+    public Amount<Long, Time> getSnapshotInterval() {
+      return snapshotInterval;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 739fad7..5aefe5f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -13,10 +13,11 @@
  */
 package org.apache.aurora.scheduler.storage.log;
 
-import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -24,36 +25,35 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Streams;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.stats.SlidingStats;
 import org.apache.aurora.common.stats.SlidingStats.Timeable;
 import org.apache.aurora.common.util.BuildInfo;
 import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobUpdateDetails;
-import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.storage.Op;
 import org.apache.aurora.gen.storage.QuotaConfiguration;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
 import org.apache.aurora.gen.storage.SchedulerMetadata;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.StoredCronJob;
 import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.Volatile;
-import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.slf4j.Logger;
@@ -65,7 +65,7 @@ import static java.util.Objects.requireNonNull;
  * Snapshot store implementation that delegates to underlying snapshot stores by
  * extracting/applying fields in a snapshot thrift struct.
  */
-public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
+public class SnapshotStoreImpl implements Snapshotter {
 
   @VisibleForTesting
   static final String SNAPSHOT_SAVE = "snapshot_save_";
@@ -83,63 +83,62 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
 
   @VisibleForTesting
   Set<String> snapshotFieldNames() {
-    return FluentIterable.from(snapshotFields)
-        .transform(SnapshotField::getName)
-        .toSet();
+    return snapshotFields.stream()
+        .map(SnapshotField::getName)
+        .collect(Collectors.toSet());
   }
 
-  private final Iterable<SnapshotField> snapshotFields = Arrays.asList(
+  private final List<SnapshotField> snapshotFields = ImmutableList.of(
       new SnapshotField() {
         @Override
-        public String getName() {
+        String getName() {
           return HOST_ATTRIBUTES_FIELD;
         }
 
         @Override
-        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
           snapshot.setHostAttributes(
               IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes()));
         }
 
         @Override
-        public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
           if (snapshot.getHostAttributesSize() > 0) {
-            store.getAttributeStore().deleteHostAttributes();
-            for (HostAttributes attributes : snapshot.getHostAttributes()) {
-              store.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes));
-            }
+            return snapshot.getHostAttributes().stream()
+                .map(attributes -> Op.saveHostAttributes(
+                    new SaveHostAttributes().setHostAttributes(attributes)));
           }
+          return Stream.empty();
         }
       },
       new SnapshotField() {
         @Override
-        public String getName() {
+        String getName() {
           return TASK_FIELD;
         }
 
         @Override
-        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
           snapshot.setTasks(
               IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
         }
 
         @Override
-        public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
           if (snapshot.getTasksSize() > 0) {
-            store.getUnsafeTaskStore().deleteAllTasks();
-            store.getUnsafeTaskStore()
-                .saveTasks(thriftBackfill.backfillTasks(snapshot.getTasks()));
+            return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks())));
           }
+          return Stream.empty();
         }
       },
       new SnapshotField() {
         @Override
-        public String getName() {
+        String getName() {
           return CRON_FIELD;
         }
 
         @Override
-        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
           ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder();
 
           for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) {
@@ -149,46 +148,46 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
         }
 
         @Override
-        public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
           if (snapshot.getCronJobsSize() > 0) {
-            store.getCronJobStore().deleteJobs();
-            for (StoredCronJob job : snapshot.getCronJobs()) {
-              store.getCronJobStore().saveAcceptedJob(
-                  thriftBackfill.backfillJobConfiguration(job.getJobConfiguration()));
-            }
+            return snapshot.getCronJobs().stream()
+                .map(job -> Op.saveCronJob(
+                    new SaveCronJob().setJobConfig(job.getJobConfiguration())));
           }
+          return Stream.empty();
         }
       },
       new SnapshotField() {
         @Override
-        public String getName() {
+        String getName() {
           return SCHEDULER_METADATA_FIELD;
         }
 
         @Override
-        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
           // SchedulerMetadata is updated outside of the static list of SnapshotFields
         }
 
         @Override
-        public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
           if (snapshot.isSetSchedulerMetadata()
               && snapshot.getSchedulerMetadata().isSetFrameworkId()) {
             // No delete necessary here since this is a single value.
 
-            store.getSchedulerStore()
-                .saveFrameworkId(snapshot.getSchedulerMetadata().getFrameworkId());
+            return Stream.of(Op.saveFrameworkId(
+                new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId())));
           }
+          return Stream.empty();
         }
       },
       new SnapshotField() {
         @Override
-        public String getName() {
+        String getName() {
           return QUOTA_FIELD;
         }
 
         @Override
-        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
           ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
           for (Map.Entry<String, IResourceAggregate> entry
               : store.getQuotaStore().fetchQuotas().entrySet()) {
@@ -200,24 +199,24 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
         }
 
         @Override
-        public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
           if (snapshot.getQuotaConfigurationsSize() > 0) {
-            store.getQuotaStore().deleteQuotas();
-            for (QuotaConfiguration quota : snapshot.getQuotaConfigurations()) {
-              store.getQuotaStore()
-                  .saveQuota(quota.getRole(), IResourceAggregate.build(quota.getQuota()));
-            }
+            return snapshot.getQuotaConfigurations().stream()
+                .map(quota -> Op.saveQuota(new SaveQuota()
+                    .setRole(quota.getRole())
+                    .setQuota(quota.getQuota())));
           }
+          return Stream.empty();
         }
       },
       new SnapshotField() {
         @Override
-        public String getName() {
+        String getName() {
           return JOB_UPDATE_FIELD;
         }
 
         @Override
-        public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
           snapshot.setJobUpdateDetails(
               store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
                   .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
@@ -225,112 +224,101 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
         }
 
         @Override
-        public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
           if (snapshot.getJobUpdateDetailsSize() > 0) {
-            JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
-            updateStore.deleteAllUpdates();
-            for (StoredJobUpdateDetails storedDetails : snapshot.getJobUpdateDetails()) {
-              JobUpdateDetails details = storedDetails.getDetails();
-              updateStore.saveJobUpdate(thriftBackfill.backFillJobUpdate(details.getUpdate()));
-
-              if (details.getUpdateEventsSize() > 0) {
-                for (JobUpdateEvent updateEvent : details.getUpdateEvents()) {
-                  updateStore.saveJobUpdateEvent(
-                      IJobUpdateKey.build(details.getUpdate().getSummary().getKey()),
-                      IJobUpdateEvent.build(updateEvent));
-                }
-              }
-
-              if (details.getInstanceEventsSize() > 0) {
-                for (JobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) {
-                  updateStore.saveJobInstanceUpdateEvent(
-                      IJobUpdateKey.build(details.getUpdate().getSummary().getKey()),
-                      IJobInstanceUpdateEvent.build(instanceEvent));
-                }
-              }
-            }
+            return snapshot.getJobUpdateDetails().stream()
+                .flatMap(details -> {
+                  Stream<Op> parent = Stream.of(Op.saveJobUpdate(
+                      new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate())));
+                  Stream<Op> jobEvents;
+                  if (details.getDetails().getUpdateEventsSize() > 0) {
+                    jobEvents = details.getDetails().getUpdateEvents().stream()
+                        .map(event -> Op.saveJobUpdateEvent(
+                            new SaveJobUpdateEvent()
+                                .setKey(details.getDetails().getUpdate().getSummary().getKey())
+                                .setEvent(event)));
+                  } else {
+                    jobEvents = Stream.empty();
+                  }
+
+                  Stream<Op> instanceEvents;
+                  if (details.getDetails().getInstanceEventsSize() > 0) {
+                    instanceEvents = details.getDetails().getInstanceEvents().stream()
+                        .map(event -> Op.saveJobInstanceUpdateEvent(
+                            new SaveJobInstanceUpdateEvent()
+                                .setKey(details.getDetails().getUpdate().getSummary().getKey())
+                                .setEvent(event)));
+                  } else {
+                    instanceEvents = Stream.empty();
+                  }
+
+                  return Streams.concat(parent, jobEvents, instanceEvents);
+                });
           }
+          return Stream.empty();
         }
       }
   );
 
   private final BuildInfo buildInfo;
   private final Clock clock;
-  private final Storage storage;
-  private final ThriftBackfill thriftBackfill;
 
   @Inject
-  public SnapshotStoreImpl(
-      BuildInfo buildInfo,
-      Clock clock,
-      @Volatile Storage storage,
-      ThriftBackfill thriftBackfill) {
-
+  public SnapshotStoreImpl(BuildInfo buildInfo, Clock clock) {
     this.buildInfo = requireNonNull(buildInfo);
     this.clock = requireNonNull(clock);
-    this.storage = requireNonNull(storage);
-    this.thriftBackfill = requireNonNull(thriftBackfill);
   }
 
-  private Snapshot createSnapshot(Storage anyStorage) {
-    // It's important to perform snapshot creation in a write lock to ensure all upstream callers
-    // are correctly synchronized (e.g. during backup creation).
-    return anyStorage.write(storeProvider -> {
-      Snapshot snapshot = new Snapshot();
-
-      // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
-      // one of the field closures is mean and tries to apply a timestamp.
-      long timestamp = clock.nowMillis();
-      for (SnapshotField field : snapshotFields) {
-        field.save(storeProvider, snapshot);
-      }
+  private Snapshot createSnapshot(StoreProvider storeProvider) {
+    Snapshot snapshot = new Snapshot();
+
+    // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
+    // one of the field closures is mean and tries to apply a timestamp.
+    long timestamp = clock.nowMillis();
+    for (SnapshotField field : snapshotFields) {
+      field.save(storeProvider, snapshot);
+    }
 
-      SchedulerMetadata metadata = new SchedulerMetadata()
-          .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull())
-          .setDetails(buildInfo.getProperties());
+    SchedulerMetadata metadata = new SchedulerMetadata()
+        .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull())
+        .setDetails(buildInfo.getProperties());
 
-      snapshot.setSchedulerMetadata(metadata);
-      snapshot.setTimestamp(timestamp);
-      return snapshot;
-    });
+    snapshot.setSchedulerMetadata(metadata);
+    snapshot.setTimestamp(timestamp);
+    return snapshot;
   }
 
   @Timed("snapshot_create")
   @Override
-  public Snapshot createSnapshot() {
-    return createSnapshot(storage);
+  public Snapshot from(StoreProvider stores) {
+    return createSnapshot(stores);
   }
 
   @Timed("snapshot_apply")
   @Override
-  public void applySnapshot(final Snapshot snapshot) {
+  public Stream<Op> asStream(Snapshot snapshot) {
     requireNonNull(snapshot);
 
-    storage.write((NoResult.Quiet) storeProvider -> {
-      LOG.info("Restoring snapshot.");
-
-      for (SnapshotField field : snapshotFields) {
-        field.restore(storeProvider, snapshot);
-      }
-    });
+    LOG.info("Restoring snapshot.");
+    return snapshotFields.stream()
+        .flatMap(field -> field.streamFrom(snapshot));
   }
 
   abstract class SnapshotField {
 
     abstract String getName();
 
-    abstract void saveToSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
+    abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
 
-    abstract void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
+    abstract Stream<Op> doStreamFrom(Snapshot snapshot);
 
-    void save(MutableStoreProvider storeProvider, Snapshot snapshot) {
+    void save(StoreProvider storeProvider, Snapshot snapshot) {
       stats.getUnchecked(SNAPSHOT_SAVE + getName())
           .time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot));
     }
 
-    void restore(MutableStoreProvider storeProvider, Snapshot snapshot) {
-      stats.getUnchecked(SNAPSHOT_RESTORE + getName())
-          .time((Timeable.NoResult.Quiet) () -> restoreFromSnapshot(storeProvider, snapshot));
+    Stream<Op> streamFrom(Snapshot snapshot) {
+      return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 159fb29..1b003ab 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -83,7 +83,7 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -170,7 +170,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
   private final ConfigurationManager configurationManager;
   private final Thresholds thresholds;
   private final NonVolatileStorage storage;
-  private final DistributedSnapshotStore snapshotStore;
+  private final SnapshotStore snapshotStore;
   private final StorageBackup backup;
   private final Recovery recovery;
   private final MaintenanceController maintenance;
@@ -200,7 +200,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
       ConfigurationManager configurationManager,
       Thresholds thresholds,
       NonVolatileStorage storage,
-      DistributedSnapshotStore snapshotStore,
+      SnapshotStore snapshotStore,
       StorageBackup backup,
       Recovery recovery,
       CronJobManager cronJobManager,

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index aeb8685..020f348 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -35,7 +35,7 @@ import org.apache.aurora.scheduler.config.CommandLine;
 import org.apache.aurora.scheduler.mesos.DriverFactory;
 import org.apache.aurora.scheduler.mesos.DriverSettings;
 import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.mesos.SchedulerDriver;
@@ -83,7 +83,7 @@ public final class LocalSchedulerMain {
       protected void configure() {
         bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class));
         bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
-        bind(DistributedSnapshotStore.class).toInstance(new DistributedSnapshotStore() {
+        bind(SnapshotStore.class).toInstance(new SnapshotStore() {
           @Override
           public void snapshot() throws Storage.StorageException {
             // no-op

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 5cb5310..53a2315 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -171,7 +171,6 @@ public class CommandLineTest {
     expected.updater.enableAffinity = true;
     expected.updater.affinityExpiration = TEST_TIME;
     expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class);
-    expected.logStorage.shutdownGracePeriod = TEST_TIME;
     expected.logStorage.snapshotInterval = TEST_TIME;
     expected.logStorage.maxLogEntrySize = TEST_DATA;
     expected.backup.backupInterval = TEST_TIME;
@@ -318,7 +317,6 @@ public class CommandLineTest {
         "-enable_update_affinity=true",
         "-update_affinity_reservation_hold_time=42days",
         "-task_assigner_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
-        "-dlog_shutdown_grace_period=42days",
         "-dlog_snapshot_interval=42days",
         "-dlog_max_entry_size=42GB",
         "-backup_interval=42days",

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
index 09560f4..ba03ff9 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
@@ -31,8 +31,8 @@ import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -49,6 +49,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
@@ -60,8 +61,8 @@ public class RecoveryTest extends EasyMockTest {
   private static final IScheduledTask TASK2 = TaskTestUtil.makeTask("task2", TaskTestUtil.JOB);
   private static final Snapshot SNAPSHOT1 = makeSnapshot(TASK1, TASK2);
 
-  private SnapshotStore<Snapshot> snapshotStore;
-  private DistributedSnapshotStore distributedStore;
+  private Snapshotter snapshotter;
+  private SnapshotStore distributedStore;
   private Storage primaryStorage;
   private MutableStoreProvider storeProvider;
   private Command shutDownNow;
@@ -74,8 +75,8 @@ public class RecoveryTest extends EasyMockTest {
   @Before
   public void setUp() throws IOException {
     final File backupDir = temporaryFolder.newFolder();
-    snapshotStore = createMock(new Clazz<SnapshotStore<Snapshot>>() { });
-    distributedStore = createMock(DistributedSnapshotStore.class);
+    snapshotter = createMock(Snapshotter.class);
+    distributedStore = createMock(SnapshotStore.class);
     primaryStorage = createMock(Storage.class);
     storeProvider = createMock(MutableStoreProvider.class);
     shutDownNow = createMock(Command.class);
@@ -84,7 +85,8 @@ public class RecoveryTest extends EasyMockTest {
     TemporaryStorageFactory factory =
         new TemporaryStorageFactory(TaskTestUtil.THRIFT_BACKFILL);
     storageBackup = new StorageBackupImpl(
-        snapshotStore,
+        primaryStorage,
+        snapshotter,
         clock,
         new BackupConfig(backupDir, 5, INTERVAL),
         executor);
@@ -94,7 +96,7 @@ public class RecoveryTest extends EasyMockTest {
 
   @Test
   public void testRecover() throws Exception {
-    expect(snapshotStore.createSnapshot()).andReturn(SNAPSHOT1);
+    expect(snapshotter.from(anyObject())).andReturn(SNAPSHOT1);
     Capture<MutateWork<Object, Exception>> transaction = createCapture();
     expect(primaryStorage.write(capture(transaction))).andReturn(null);
     Capture<Snapshot> snapshot = createCapture();
@@ -106,7 +108,7 @@ public class RecoveryTest extends EasyMockTest {
     assertEquals(ImmutableSet.of(), recovery.listBackups());
 
     clock.advance(INTERVAL);
-    storageBackup.createSnapshot();
+    storageBackup.from(storeProvider);
     String backup1 = storageBackup.createBackupName();
     assertEquals(ImmutableSet.of(backup1), recovery.listBackups());
 
@@ -122,7 +124,7 @@ public class RecoveryTest extends EasyMockTest {
 
   @Test
   public void testModifySnapshotBeforeCommit() throws Exception {
-    expect(snapshotStore.createSnapshot()).andReturn(SNAPSHOT1);
+    expect(snapshotter.from(anyObject())).andReturn(SNAPSHOT1);
     Snapshot modified = SNAPSHOT1.deepCopy().setTasks(ImmutableSet.of(TASK1.newBuilder()));
     Capture<MutateWork<Object, Exception>> transaction = createCapture();
     expect(primaryStorage.write(capture(transaction))).andReturn(null);
@@ -133,7 +135,7 @@ public class RecoveryTest extends EasyMockTest {
     control.replay();
 
     clock.advance(INTERVAL);
-    storageBackup.createSnapshot();
+    storageBackup.from(storeProvider);
     String backup1 = storageBackup.createBackupName();
     recovery.stage(backup1);
     assertEquals(

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java
index fff376f..f0ba33f 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java
@@ -39,9 +39,12 @@ import org.apache.aurora.gen.storage.QuotaConfiguration;
 import org.apache.aurora.gen.storage.SchedulerMetadata;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.StoredCronJob;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl.BackupConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.junit.Before;
 import org.junit.Rule;
@@ -49,6 +52,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.aggregate;
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -58,7 +62,8 @@ public class StorageBackupTest extends EasyMockTest {
   private static final int MAX_BACKUPS = 5;
   private static final Amount<Long, Time> INTERVAL = Amount.of(1L, Time.HOURS);
 
-  private SnapshotStore<Snapshot> delegate;
+  private Storage storage;
+  private Snapshotter delegate;
   private FakeClock clock;
   private BackupConfig config;
   private StorageBackupImpl storageBackup;
@@ -67,29 +72,35 @@ public class StorageBackupTest extends EasyMockTest {
 
   @Before
   public void setUp() throws IOException {
-    delegate = createMock(new Clazz<SnapshotStore<Snapshot>>() { });
+    storage = MemStorageModule.newEmptyStorage();
+    delegate = createMock(Snapshotter.class);
     final File backupDir = temporaryFolder.newFolder();
     ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
     clock = FakeScheduledExecutor.scheduleExecutor(executor);
     config = new BackupConfig(backupDir, MAX_BACKUPS, INTERVAL);
     clock.advance(Amount.of(365 * 30L, Time.DAYS));
-    storageBackup = new StorageBackupImpl(delegate, clock, config, executor);
+    storageBackup = new StorageBackupImpl(storage, delegate, clock, config, executor);
+  }
+
+  private void triggerSnapshot(Snapshot expectedResult) {
+    storage.write((NoResult.Quiet) stores ->
+        assertEquals(expectedResult, storageBackup.from(stores)));
   }
 
   @Test
   public void testBackup() throws Exception {
     Snapshot snapshot = makeSnapshot();
-    expect(delegate.createSnapshot()).andReturn(snapshot).times(3);
+    expect(delegate.from(anyObject())).andReturn(snapshot).times(3);
 
     control.replay();
 
-    assertEquals(snapshot, storageBackup.createSnapshot());
+    triggerSnapshot(snapshot);
     assertBackupCount(0);
     clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS) - 1, Time.MILLISECONDS));
-    assertEquals(snapshot, storageBackup.createSnapshot());
+    triggerSnapshot(snapshot);
     assertBackupCount(0);
     clock.advance(Amount.of(1L, Time.MILLISECONDS));
-    assertEquals(snapshot, storageBackup.createSnapshot());
+    triggerSnapshot(snapshot);
     assertBackupCount(1);
     assertEquals(1, storageBackup.getSuccesses().get());
 
@@ -104,34 +115,34 @@ public class StorageBackupTest extends EasyMockTest {
   @Test
   public void testDirectoryMissing() {
     Snapshot snapshot = makeSnapshot();
-    expect(delegate.createSnapshot()).andReturn(snapshot).times(1);
+    expect(delegate.from(anyObject())).andReturn(snapshot).times(1);
 
     control.replay();
 
     clock.advance(INTERVAL);
     config.getDir().delete();
-    assertEquals(snapshot, storageBackup.createSnapshot());
+    triggerSnapshot(snapshot);
     assertEquals(1, storageBackup.getFailures().get());
   }
 
   @Test
   public void testOldBackupsDeleted() {
     Snapshot snapshot = makeSnapshot();
-    expect(delegate.createSnapshot()).andReturn(snapshot).times(MAX_BACKUPS + 1);
+    expect(delegate.from(anyObject())).andReturn(snapshot).times(MAX_BACKUPS + 1);
 
     control.replay();
 
     ImmutableList.Builder<String> nameBuilder = ImmutableList.builder();
     for (int i = 0; i < MAX_BACKUPS; i++) {
       clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS), Time.MILLISECONDS));
-      assertEquals(snapshot, storageBackup.createSnapshot());
+      triggerSnapshot(snapshot);
       nameBuilder.add(storageBackup.createBackupName());
       assertBackupCount(i + 1);
       assertEquals(i + 1, storageBackup.getSuccesses().get());
     }
 
     clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS), Time.MILLISECONDS));
-    assertEquals(snapshot, storageBackup.createSnapshot());
+    triggerSnapshot(snapshot);
     nameBuilder.add(storageBackup.createBackupName());
     assertBackupCount(MAX_BACKUPS);
     assertEquals(MAX_BACKUPS + 1, storageBackup.getSuccesses().get());
@@ -150,17 +161,17 @@ public class StorageBackupTest extends EasyMockTest {
   public void testInterval() {
     // Ensures that a long initial interval does not result in shortened subsequent intervals.
     Snapshot snapshot = makeSnapshot();
-    expect(delegate.createSnapshot()).andReturn(snapshot).times(3);
+    expect(delegate.from(anyObject())).andReturn(snapshot).times(3);
 
     control.replay();
 
-    assertEquals(snapshot, storageBackup.createSnapshot());
+    triggerSnapshot(snapshot);
     assertBackupCount(0);
     clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS) * 3, Time.MILLISECONDS));
-    assertEquals(snapshot, storageBackup.createSnapshot());
+    triggerSnapshot(snapshot);
     assertBackupCount(1);
     assertEquals(1, storageBackup.getSuccesses().get());
-    assertEquals(snapshot, storageBackup.createSnapshot());
+    triggerSnapshot(snapshot);
     assertBackupCount(1);
     assertEquals(1, storageBackup.getSuccesses().get());
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
index 3ad40ad..a6bf330 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.storage.durability;
 
-import java.util.EnumSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
@@ -74,6 +73,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
@@ -167,44 +167,49 @@ public class DurableStorageTest extends EasyMockTest {
     // Populate all Op types.
     buildReplayOps();
 
+    storageUtil.expectStoreAccesses();
+
     control.replay();
 
     durableStorage.prepare();
     durableStorage.start(initializationLogic);
     assertTrue(initialized.get());
-
-    // Assert all Transaction types have handlers defined.
-    assertEquals(
-        EnumSet.allOf(Op._Fields.class),
-        EnumSet.copyOf(durableStorage.buildTransactionReplayActions().keySet()));
   }
 
   private void buildReplayOps() throws Exception {
-    ImmutableSet.Builder<Op> builder = ImmutableSet.builder();
+    ImmutableSet.Builder<Edit> builder = ImmutableSet.builder();
 
-    builder.add(Op.saveFrameworkId(new SaveFrameworkId("bob")));
+    builder.add(Edit.op(Op.saveFrameworkId(new SaveFrameworkId("bob"))));
     storageUtil.schedulerStore.saveFrameworkId("bob");
 
     JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig());
     JobConfiguration expectedJob =
         new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder());
     SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob);
-    builder.add(Op.saveCronJob(cronJob));
+    builder.add(Edit.op(Op.saveCronJob(cronJob)));
     storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob));
 
     RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
-    builder.add(Op.removeJob(removeJob));
+    builder.add(Edit.op(Op.removeJob(removeJob)));
     storageUtil.jobStore.removeJob(JOB_KEY);
 
     ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder();
     actualTask.getAssignedTask().setTask(nonBackfilledConfig());
     IScheduledTask expectedTask = makeTask("id", JOB_KEY);
     SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask));
-    builder.add(Op.saveTasks(saveTasks));
+    builder.add(Edit.op(Op.saveTasks(saveTasks)));
     storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask));
 
+    // Side-effects from a storage reset, caused by a snapshot.
+    builder.add(Edit.deleteAll());
+    storageUtil.jobStore.deleteJobs();
+    storageUtil.taskStore.deleteAllTasks();
+    storageUtil.quotaStore.deleteQuotas();
+    storageUtil.attributeStore.deleteHostAttributes();
+    storageUtil.jobUpdateStore.deleteAllUpdates();
+
     RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
-    builder.add(Op.removeTasks(removeTasks));
+    builder.add(Edit.op(Op.removeTasks(removeTasks)));
     storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
 
     ResourceAggregate nonBackfilled = new ResourceAggregate()
@@ -212,33 +217,33 @@ public class DurableStorageTest extends EasyMockTest {
         .setRamMb(32)
         .setDiskMb(64);
     SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled);
-    builder.add(Op.saveQuota(saveQuota));
+    builder.add(Edit.op(Op.saveQuota(saveQuota)));
     storageUtil.quotaStore.saveQuota(
         saveQuota.getRole(),
         IResourceAggregate.build(nonBackfilled.deepCopy()
             .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))));
 
-    builder.add(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole())));
+    builder.add(Edit.op(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole()))));
     storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
 
     // This entry lacks a slave ID, and should therefore be discarded.
     SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
         .setHost("host1")
         .setMode(MaintenanceMode.DRAINED));
-    builder.add(Op.saveHostAttributes(hostAttributes1));
+    builder.add(Edit.op(Op.saveHostAttributes(hostAttributes1)));
 
     SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
         .setHost("host2")
         .setSlaveId("slave2")
         .setMode(MaintenanceMode.DRAINED));
-    builder.add(Op.saveHostAttributes(hostAttributes2));
+    builder.add(Edit.op(Op.saveHostAttributes(hostAttributes2)));
     expect(storageUtil.attributeStore.saveHostAttributes(
         IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true);
 
-    builder.add(Op.saveLock(new SaveLock()));
+    builder.add(Edit.op(Op.saveLock(new SaveLock())));
     // TODO(jly): Deprecated, this is a no-op to be removed in 0.21. See AURORA-1959.
 
-    builder.add(Op.removeLock(new RemoveLock()));
+    builder.add(Edit.op(Op.removeLock(new RemoveLock())));
     // TODO(jly): Deprecated, this is a no-op to be removed in 0.21. See AURORA-1959.
 
     JobUpdate actualUpdate = new JobUpdate()
@@ -252,12 +257,12 @@ public class DurableStorageTest extends EasyMockTest {
     expectedUpdate.getInstructions().getInitialState()
         .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
     SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
-    builder.add(Op.saveJobUpdate(saveUpdate));
+    builder.add(Edit.op(Op.saveJobUpdate(saveUpdate)));
     storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
 
     SaveJobUpdateEvent saveUpdateEvent =
         new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder());
-    builder.add(Op.saveJobUpdateEvent(saveUpdateEvent));
+    builder.add(Edit.op(Op.saveJobUpdateEvent(saveUpdateEvent)));
     storageUtil.jobUpdateStore.saveJobUpdateEvent(
         UPDATE_ID,
         IJobUpdateEvent.build(saveUpdateEvent.getEvent()));
@@ -265,16 +270,16 @@ public class DurableStorageTest extends EasyMockTest {
     SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent(
         new JobInstanceUpdateEvent(),
         UPDATE_ID.newBuilder());
-    builder.add(Op.saveJobInstanceUpdateEvent(saveInstanceEvent));
+    builder.add(Edit.op(Op.saveJobInstanceUpdateEvent(saveInstanceEvent)));
     storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
         UPDATE_ID,
         IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()));
 
-    builder.add(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L)));
+    builder.add(Edit.op(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L))));
     // No expectation - this op is ignored.
 
-    builder.add(Op.removeJobUpdate(
-        new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder()))));
+    builder.add(Edit.op(Op.removeJobUpdate(
+        new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder())))));
     storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID));
 
     expect(persistence.recover()).andReturn(builder.build().stream());

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
deleted file mode 100644
index e8b564b..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.durability;
-
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class WriteAheadStorageTest extends EasyMockTest {
-
-  private TransactionManager transactionManager;
-  private TaskStore.Mutable taskStore;
-  private AttributeStore.Mutable attributeStore;
-  private JobUpdateStore.Mutable jobUpdateStore;
-  private EventSink eventSink;
-  private WriteAheadStorage storage;
-
-  @Before
-  public void setUp() {
-    transactionManager = createMock(TransactionManager.class);
-    taskStore = createMock(TaskStore.Mutable.class);
-    attributeStore = createMock(AttributeStore.Mutable.class);
-    jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
-    eventSink = createMock(EventSink.class);
-
-    storage = new WriteAheadStorage(
-        transactionManager,
-        createMock(SchedulerStore.Mutable.class),
-        createMock(CronJobStore.Mutable.class),
-        taskStore,
-        createMock(QuotaStore.Mutable.class),
-        attributeStore,
-        jobUpdateStore,
-        LoggerFactory.getLogger(WriteAheadStorageTest.class),
-        eventSink);
-  }
-
-  private void expectOp(Op op) {
-    expect(transactionManager.hasActiveTransaction()).andReturn(true);
-    transactionManager.log(op);
-  }
-
-  @Test
-  public void testRemoveUpdates() {
-    Set<IJobUpdateKey> removed = ImmutableSet.of(
-        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
-        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
-    jobUpdateStore.removeJobUpdates(removed);
-    // No operation is written since this Op is in read-only compatibility mode.
-
-    control.replay();
-
-    storage.removeJobUpdates(removed);
-  }
-
-  @Test
-  public void testMutate() {
-    String taskId = "a";
-    Function<IScheduledTask, IScheduledTask> mutator =
-        createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
-    Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
-
-    expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
-    expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
-    control.replay();
-
-    assertEquals(mutated, storage.mutateTask(taskId, mutator));
-  }
-
-  @Test
-  public void testSaveHostAttributes() {
-    IHostAttributes attributes = IHostAttributes.build(
-        new HostAttributes()
-            .setHost("a")
-            .setMode(MaintenanceMode.DRAINING)
-            .setAttributes(ImmutableSet.of(
-                new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
-
-    expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
-    expectOp(Op.saveHostAttributes(
-        new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
-    eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
-
-    expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
-
-    control.replay();
-
-    assertTrue(storage.saveHostAttributes(attributes));
-
-    assertFalse(storage.saveHostAttributes(attributes));
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteAllTasks() {
-    control.replay();
-    storage.deleteAllTasks();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteHostAttributes() {
-    control.replay();
-    storage.deleteHostAttributes();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteJobs() {
-    control.replay();
-    storage.deleteJobs();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteQuotas() {
-    control.replay();
-    storage.deleteQuotas();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteAllUpdatesAndEvents() {
-    control.replay();
-    storage.deleteAllUpdates();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java
new file mode 100644
index 0000000..1a89e83
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class WriteRecorderTest extends EasyMockTest {
+
+  private TransactionManager transactionManager;
+  private TaskStore.Mutable taskStore;
+  private AttributeStore.Mutable attributeStore;
+  private JobUpdateStore.Mutable jobUpdateStore;
+  private EventSink eventSink;
+  private WriteRecorder storage;
+
+  @Before
+  public void setUp() {
+    transactionManager = createMock(TransactionManager.class);
+    taskStore = createMock(TaskStore.Mutable.class);
+    attributeStore = createMock(AttributeStore.Mutable.class);
+    jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
+    eventSink = createMock(EventSink.class);
+
+    storage = new WriteRecorder(
+        transactionManager,
+        createMock(SchedulerStore.Mutable.class),
+        createMock(CronJobStore.Mutable.class),
+        taskStore,
+        createMock(QuotaStore.Mutable.class),
+        attributeStore,
+        jobUpdateStore,
+        LoggerFactory.getLogger(WriteRecorderTest.class),
+        eventSink);
+  }
+
+  private void expectOp(Op op) {
+    expect(transactionManager.hasActiveTransaction()).andReturn(true);
+    transactionManager.log(op);
+  }
+
+  @Test
+  public void testRemoveUpdates() {
+    Set<IJobUpdateKey> removed = ImmutableSet.of(
+        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
+        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
+    jobUpdateStore.removeJobUpdates(removed);
+    // No operation is written since this Op is in read-only compatibility mode.
+
+    control.replay();
+
+    storage.removeJobUpdates(removed);
+  }
+
+  @Test
+  public void testMutate() {
+    String taskId = "a";
+    Function<IScheduledTask, IScheduledTask> mutator =
+        createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
+    Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
+
+    expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
+    expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+    control.replay();
+
+    assertEquals(mutated, storage.mutateTask(taskId, mutator));
+  }
+
+  @Test
+  public void testSaveHostAttributes() {
+    IHostAttributes attributes = IHostAttributes.build(
+        new HostAttributes()
+            .setHost("a")
+            .setMode(MaintenanceMode.DRAINING)
+            .setAttributes(ImmutableSet.of(
+                new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
+
+    expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
+    expectOp(Op.saveHostAttributes(
+        new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
+    eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
+
+    expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
+
+    control.replay();
+
+    assertTrue(storage.saveHostAttributes(attributes));
+
+    assertFalse(storage.saveHostAttributes(attributes));
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteAllTasks() {
+    control.replay();
+    storage.deleteAllTasks();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteHostAttributes() {
+    control.replay();
+    storage.deleteHostAttributes();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteJobs() {
+    control.replay();
+    storage.deleteJobs();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteQuotas() {
+    control.replay();
+    storage.deleteQuotas();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteAllUpdatesAndEvents() {
+    control.replay();
+    storage.deleteAllUpdates();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
new file mode 100644
index 0000000..3d6d555
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.aurora.codec.ThriftBinaryCodec;
+import org.apache.aurora.common.inject.Bindings;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.BuildInfo;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.common.util.testing.FakeBuildInfo;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.gen.storage.Transaction;
+import org.apache.aurora.scheduler.TierModule;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.log.Log;
+import org.apache.aurora.scheduler.log.Log.Entry;
+import org.apache.aurora.scheduler.log.Log.Stream;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
+import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class LogPersistenceTest extends EasyMockTest {
+
+  private Persistence persistence;
+
+  private Log mockLog;
+  private Stream mockStream;
+
+  @Before
+  public void setUp() {
+    mockLog = createMock(Log.class);
+    mockStream = createMock(Stream.class);
+
+    Injector injector = Guice.createInjector(
+        new LogStorageModule(new Options()),
+        new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
+        new TierModule(TaskTestUtil.TIER_CONFIG),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+            bind(EventSink.class).toInstance(e -> { });
+            bind(BuildInfo.class).toInstance(FakeBuildInfo.generateBuildInfo());
+            bind(Clock.class).toInstance(new FakeClock());
+            bind(Snapshotter.class).to(SnapshotStoreImpl.class);
+            bind(Log.class).toInstance(mockLog);
+          }
+        }
+    );
+
+    persistence = injector.getInstance(Persistence.class);
+  }
+
+  @Test
+  public void testRecoverEmpty() throws Exception {
+    expect(mockLog.open()).andReturn(mockStream);
+    List<Entry> empty = ImmutableList.of();
+    expect(mockStream.readAll()).andReturn(empty.iterator());
+
+    control.replay();
+
+    persistence.prepare();
+    assertEquals(ImmutableList.of(), persistence.recover().collect(Collectors.toList()));
+  }
+
+  @Test
+  public void testRecoverSnapshot() throws Exception {
+    expect(mockLog.open()).andReturn(mockStream);
+
+    Op saveA = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of(
+        TaskTestUtil.makeTask("a", TaskTestUtil.JOB).newBuilder())));
+    Op saveB = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of(
+        TaskTestUtil.makeTask("b", TaskTestUtil.JOB).newBuilder())));
+    Op saveC = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of(
+        TaskTestUtil.makeTask("c", TaskTestUtil.JOB).newBuilder())));
+
+    List<Entry> entries = ImmutableList.of(
+        logEntry(LogEntry.transaction(new Transaction().setOps(ImmutableList.of(saveA)))),
+        logEntry(LogEntry.snapshot(new Snapshot().setTasks(saveB.getSaveTasks().getTasks()))),
+        logEntry(LogEntry.transaction(new Transaction().setOps(ImmutableList.of(saveC)))));
+
+    expect(mockStream.readAll()).andReturn(entries.iterator());
+
+    control.replay();
+
+    persistence.prepare();
+    assertEquals(
+        ImmutableList.of(
+            Edit.op(saveA),
+            Edit.deleteAll(),
+            Edit.op(saveB),
+            Edit.op(saveC)),
+        persistence.recover().collect(Collectors.toList()));
+  }
+
+  private static Entry logEntry(LogEntry entry) {
+    return () -> ThriftBinaryCodec.encodeNonNull(entry);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
index eb966d7..fdde73d 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.common.application.ShutdownRegistry;
 import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
@@ -35,15 +34,14 @@ import org.apache.aurora.common.util.BuildInfo;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.common.util.testing.FakeBuildInfo;
 import org.apache.aurora.common.util.testing.FakeClock;
-import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.config.types.DataAmount;
 import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -62,7 +60,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
   private FakeLog log;
   private Runnable teardown = () -> { };
   private NonVolatileStorage storage;
-  private DistributedSnapshotStore snapshotStore;
+  private SnapshotStore snapshotStore;
 
   @Before
   public void setUp() {
@@ -92,12 +90,12 @@ public class NonVolatileStorageTest extends TearDownTestCase {
             bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
             bind(StatsProvider.class).toInstance(new FakeStatsProvider());
             bind(Log.class).toInstance(log);
-            bind(new TypeLiteral<SnapshotStore<Snapshot>>() { }).to(SnapshotStoreImpl.class);
+            bind(Snapshotter.class).to(SnapshotStoreImpl.class);
           }
         }
     );
     storage = injector.getInstance(NonVolatileStorage.class);
-    snapshotStore = injector.getInstance(DistributedSnapshotStore.class);
+    snapshotStore = injector.getInstance(SnapshotStore.class);
     storage.prepare();
     storage.start(w -> { });
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
new file mode 100644
index 0000000..270453d
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+
+import org.apache.aurora.GuavaUtils.ServiceManagerIface;
+import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
+import org.apache.aurora.common.application.ShutdownStage;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.inject.Bindings;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.SchedulerLifecycle.SchedulerActive;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.TierModule;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.log.Log;
+import org.apache.aurora.scheduler.log.Log.Entry;
+import org.apache.aurora.scheduler.log.Log.Position;
+import org.apache.aurora.scheduler.log.Log.Stream;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class SnapshotServiceTest extends EasyMockTest {
+
+  private static final Snapshot SNAPSHOT = new Snapshot().setTasks(
+      ImmutableSet.of(TaskTestUtil.makeTask("a", TaskTestUtil.JOB).newBuilder()));
+
+  private NonVolatileStorage storage;
+  private SnapshotStore snapshotStore;
+  private ServiceManagerIface serviceManager;
+
+  private Snapshotter mockSnapshotter;
+  private Log mockLog;
+  private Stream mockStream;
+  private Position mockPosition;
+
+  private void setUp(Amount<Long, Time> snapshotInterval) {
+    mockSnapshotter = createMock(Snapshotter.class);
+    mockLog = createMock(Log.class);
+    mockStream = createMock(Stream.class);
+    mockPosition = createMock(Position.class);
+
+    Options options = new Options();
+    options.snapshotInterval =
+        new TimeAmount(snapshotInterval.getValue(), snapshotInterval.getUnit());
+
+    Injector injector = Guice.createInjector(
+        new SchedulerServicesModule(),
+        new LogStorageModule(options),
+        new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
+        new TierModule(TaskTestUtil.TIER_CONFIG),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Key.get(Command.class, ShutdownStage.class)).to(ShutdownRegistryImpl.class);
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+            bind(EventSink.class).toInstance(e -> { });
+            bind(Snapshotter.class).toInstance(mockSnapshotter);
+            bind(Log.class).toInstance(mockLog);
+          }
+        }
+    );
+
+    storage = injector.getInstance(NonVolatileStorage.class);
+    snapshotStore = injector.getInstance(SnapshotStore.class);
+    serviceManager =
+        injector.getInstance(Key.get(ServiceManagerIface.class, SchedulerActive.class));
+  }
+
+  private void expectStorageInitialized() throws Exception {
+    expect(mockLog.open()).andReturn(mockStream);
+    List<Entry> empty = ImmutableList.of();
+    expect(mockStream.readAll()).andReturn(empty.iterator());
+  }
+
+  private void expectSnapshotPersist(CountDownLatch latch) {
+    expect(mockStream.append(anyObject())).andReturn(mockPosition).atLeastOnce();
+    mockStream.truncateBefore(mockPosition);
+    expectLastCall().andAnswer((IAnswer<Void>) () -> {
+      latch.countDown();
+      return null;
+    }).atLeastOnce();
+  }
+
+  @Test
+  public void testPeriodicSnapshots() throws Exception {
+    setUp(Amount.of(1L, Time.MILLISECONDS));
+
+    expectStorageInitialized();
+
+    expect(mockSnapshotter.from(anyObject())).andReturn(SNAPSHOT).atLeastOnce();
+
+    CountDownLatch snapshotCalled = new CountDownLatch(2);
+    expectSnapshotPersist(snapshotCalled);
+
+    control.replay();
+
+    storage.prepare();
+    storage.start(stores -> { });
+    serviceManager.startAsync().awaitHealthy();
+
+    snapshotCalled.await();
+
+    serviceManager.stopAsync().awaitStopped(10, TimeUnit.SECONDS);
+  }
+
+  @Test
+  public void testExplicitInternalSnapshot() throws Exception {
+    setUp(Amount.of(1L, Time.HOURS));
+
+    expectStorageInitialized();
+
+    expect(mockSnapshotter.from(anyObject())).andReturn(SNAPSHOT);
+    expectSnapshotPersist(new CountDownLatch(1));
+
+    control.replay();
+
+    storage.prepare();
+    storage.start(stores -> { });
+    snapshotStore.snapshot();
+  }
+
+  @Test
+  public void testExplicitProvidedSnapshot() throws Exception {
+    setUp(Amount.of(1L, Time.HOURS));
+
+    expectStorageInitialized();
+    expectSnapshotPersist(new CountDownLatch(1));
+
+    control.replay();
+
+    storage.prepare();
+    storage.start(stores -> { });
+    snapshotStore.snapshotWith(SNAPSHOT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
index 5634f92..2ad4e84 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
@@ -18,12 +18,8 @@ import java.util.Map;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
 
 import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.testing.FakeBuildInfo;
 import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.Attribute;
@@ -55,6 +51,9 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.durability.Loader;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
 import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -65,10 +64,10 @@ import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Test;
 
 import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.THRIFT_BACKFILL;
 import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag;
 import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_RESTORE;
 import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_SAVE;
@@ -80,35 +79,27 @@ public class SnapshotStoreImplIT {
   private static final long NOW = 10335463456L;
   private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job");
 
-  private SnapshotStoreImpl snapshotStore;
+  private Storage storage;
+  private SnapshotStoreImpl snapshotter;
 
   private void setUpStore() {
-    Injector injector = Guice.createInjector(
-        new MemStorageModule(),
-        new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
-          }
-        });
-
+    storage = MemStorageModule.newEmptyStorage();
     FakeClock clock = new FakeClock();
     clock.setNowMillis(NOW);
-    snapshotStore = new SnapshotStoreImpl(
-        generateBuildInfo(),
-        clock,
-        injector.getInstance(Storage.class),
-        TaskTestUtil.THRIFT_BACKFILL);
+    snapshotter = new SnapshotStoreImpl(generateBuildInfo(), clock);
     Stats.flush();
   }
 
   @Test
   public void testBackfill() {
     setUpStore();
-    snapshotStore.applySnapshot(makeNonBackfilled());
+    storage.write((NoResult.Quiet) stores ->
+        Loader.load(
+            stores,
+            THRIFT_BACKFILL,
+            snapshotter.asStream(makeNonBackfilled()).map(Edit::op)));
 
-    Snapshot backfilled = snapshotStore.createSnapshot();
-    assertEquals(expected(), backfilled);
+    assertEquals(expected(), storage.write(snapshotter::from));
     assertSnapshotRestoreStats(1L);
     assertSnapshotSaveStats(1L);
   }
@@ -183,14 +174,14 @@ public class SnapshotStoreImplIT {
   }
 
   private void assertSnapshotSaveStats(long count) {
-    for (String stat : snapshotStore.snapshotFieldNames()) {
+    for (String stat : snapshotter.snapshotFieldNames()) {
       assertEquals(count, Stats.getVariable(SNAPSHOT_SAVE + stat + "_events").read());
       assertNotNull(Stats.getVariable(SNAPSHOT_SAVE + stat + "_nanos_total"));
     }
   }
 
   private void assertSnapshotRestoreStats(long count) {
-    for (String stat : snapshotStore.snapshotFieldNames()) {
+    for (String stat : snapshotter.snapshotFieldNames()) {
       assertEquals(count, Stats.getVariable(SNAPSHOT_RESTORE + stat + "_events").read());
       assertNotNull(Stats.getVariable(SNAPSHOT_RESTORE + stat + "_nanos_total"));
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index fd81bff..64fbb54 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -83,9 +83,9 @@ public class StorageTestUtil {
   }
 
   /**
-   * Expects any number of read or write operations.
+   * Expects any number of calls to fetch individual stores.
    */
-  public void expectOperations() {
+  public void expectStoreAccesses() {
     expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes();
     expect(storeProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
     expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
@@ -99,6 +99,13 @@ public class StorageTestUtil {
     expect(mutableStoreProvider.getCronJobStore()).andReturn(jobStore).anyTimes();
     expect(mutableStoreProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
     expect(mutableStoreProvider.getJobUpdateStore()).andReturn(jobUpdateStore).anyTimes();
+  }
+
+  /**
+   * Expects any number of read or write operations.
+   */
+  public void expectOperations() {
+    expectStoreAccesses();
     expectRead().anyTimes();
     expectWrite().anyTimes();
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 919ac14..040baf9 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -84,7 +84,7 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
@@ -178,7 +178,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
       ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2"));
 
   private StorageTestUtil storageUtil;
-  private DistributedSnapshotStore snapshotStore;
+  private SnapshotStore snapshotStore;
   private StorageBackup backup;
   private Recovery recovery;
   private MaintenanceController maintenance;
@@ -197,7 +197,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void setUp() throws Exception {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
-    snapshotStore = createMock(DistributedSnapshotStore.class);
+    snapshotStore = createMock(SnapshotStore.class);
     backup = createMock(StorageBackup.class);
     recovery = createMock(Recovery.class);
     maintenance = createMock(MaintenanceController.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index bb0fd89..231fd8d 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -62,7 +62,7 @@ import org.apache.aurora.scheduler.quota.QuotaModule;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.stats.StatsModule;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
@@ -136,7 +136,7 @@ public class ThriftIT extends EasyMockTest {
             bind(FrameworkInfoFactoryImpl.class).in(Singleton.class);
             bindMock(Recovery.class);
             bindMock(StorageBackup.class);
-            bindMock(DistributedSnapshotStore.class);
+            bindMock(SnapshotStore.class);
             bind(IServerInfo.class).toInstance(SERVER_INFO);
           }
 


[2/2] aurora git commit: Recover snapshots via the Op stream

Posted by wf...@apache.org.
Recover snapshots via the Op stream

This cleans up the various interfaces around persisting and recovering from
`Snapshot`s.  Most importantly, `LogPersistence` no longer bypasses the
`recover()` `Op` stream to apply snapshots.  As a result, it should be
straightforward to build a migration utility that clones `LogPersistence`
state into another `Persistence` implementation.

Reviewed at https://reviews.apache.org/r/64286/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5f79f7ca
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5f79f7ca
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5f79f7ca

Branch: refs/heads/master
Commit: 5f79f7ca7c62f053f66a9ea925cebb78a644ce54
Parents: 4489dc3
Author: Bill Farner <wf...@apache.org>
Authored: Wed Dec 13 20:37:57 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Dec 13 20:37:57 2017 -0800

----------------------------------------------------------------------
 .../aurora/benchmark/SnapshotBenchmarks.java    |   4 +-
 .../storage/DistributedSnapshotStore.java       |  39 --
 .../aurora/scheduler/storage/SnapshotStore.java |  25 +-
 .../aurora/scheduler/storage/Snapshotter.java   |  43 +++
 .../scheduler/storage/backup/BackupModule.java  |  14 +-
 .../scheduler/storage/backup/Recovery.java      |  10 +-
 .../scheduler/storage/backup/StorageBackup.java |  27 +-
 .../storage/backup/TemporaryStorage.java        |  21 +-
 .../storage/durability/DurableStorage.java      | 155 +-------
 .../scheduler/storage/durability/Loader.java    | 150 ++++++++
 .../storage/durability/Persistence.java         |  56 ++-
 .../storage/durability/WriteAheadStorage.java   | 368 -------------------
 .../storage/durability/WriteRecorder.java       | 368 +++++++++++++++++++
 .../scheduler/storage/log/LogPersistence.java   | 206 ++---------
 .../scheduler/storage/log/LogStorageModule.java |  86 ++---
 .../scheduler/storage/log/SnapshotService.java  | 121 ++++++
 .../storage/log/SnapshotStoreImpl.java          | 236 ++++++------
 .../thrift/SchedulerThriftInterface.java        |   6 +-
 .../scheduler/app/local/LocalSchedulerMain.java |   4 +-
 .../scheduler/config/CommandLineTest.java       |   2 -
 .../scheduler/storage/backup/RecoveryTest.java  |  22 +-
 .../storage/backup/StorageBackupTest.java       |  45 ++-
 .../storage/durability/DurableStorageTest.java  |  53 +--
 .../durability/WriteAheadStorageTest.java       | 166 ---------
 .../storage/durability/WriteRecorderTest.java   | 166 +++++++++
 .../storage/log/LogPersistenceTest.java         | 134 +++++++
 .../storage/log/NonVolatileStorageTest.java     |  10 +-
 .../storage/log/SnapshotServiceTest.java        | 174 +++++++++
 .../storage/log/SnapshotStoreImplIT.java        |  41 +--
 .../storage/testing/StorageTestUtil.java        |  11 +-
 .../thrift/SchedulerThriftInterfaceTest.java    |   6 +-
 .../aurora/scheduler/thrift/ThriftIT.java       |   4 +-
 32 files changed, 1578 insertions(+), 1195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
index 755582d..4f99f80 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
@@ -75,7 +75,7 @@ public class SnapshotBenchmarks {
 
     @Benchmark
     public boolean run() throws TException {
-      snapshotStore.applySnapshot(snapshot);
+      snapshotStore.asStream(snapshot);
       // Return non-guessable result to satisfy "blackhole" requirement.
       return System.currentTimeMillis() % 5 == 0;
     }
@@ -103,7 +103,7 @@ public class SnapshotBenchmarks {
           .setNumInstanceEvents(instanceEvents)
           .build(updates));
 
-      return snapshotStore.createSnapshot();
+      return storage.write(snapshotStore::from);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
deleted file mode 100644
index 0c6a955..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage;
-
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-
-/**
- * A distributed snapshot store that supports persisting globally-visible snapshots.
- */
-public interface DistributedSnapshotStore {
-
-  /**
-   * Clean up the underlying storage by optimizing internal data structures. Does not change
-   * externally-visible state but might not run concurrently with write operations.
-   */
-  void snapshot() throws StorageException;
-
-  /**
-   * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an
-   * internally-generated one based on the current state.
-   *
-   * @param snapshot Snapshot to write.
-   * @throws CodingException If the snapshot could not be serialized.
-   */
-  void snapshotWith(Snapshot snapshot) throws CodingException;
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
index 6b5e5dd..ab109ab 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
@@ -13,24 +13,27 @@
  */
 package org.apache.aurora.scheduler.storage;
 
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+
 /**
- * Storage mechanism that is able to create complete snapshots of the local storage system state
- * and apply these to restore local storage from a snapshotted baseline.
+ * A storage component that applies full-state snapshots.
  */
-public interface SnapshotStore<T> {
+public interface SnapshotStore {
 
   /**
-   * Creates a consistent snapshot of the local storage system.
-   *
-   * @return A blob that can be used to recover local storage via {@link #applySnapshot(Object)}.
+   * Clean up the underlying storage by optimizing internal data structures. Does not change
+   * externally-visible state but might not run concurrently with write operations.
    */
-  T createSnapshot();
+  void snapshot() throws StorageException;
 
   /**
-   * Applies a snapshot blob to the local storage system, wiping out all existing data and
-   * resetting with the contents of the snapshot.
+   * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an
+   * internally-generated one based on the current state.
    *
-   * @param snapshot A snapshot blob created by {@link #createSnapshot()}.
+   * @param snapshot Snapshot to write.
+   * @throws CodingException If the snapshot could not be serialized.
    */
-  void applySnapshot(T snapshot);
+  void snapshotWith(Snapshot snapshot) throws CodingException;
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java b/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java
new file mode 100644
index 0000000..0966faf
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage;
+
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+
+/**
+ * Logic to convert storage contents into a snapshot, and a snapshot into a stream of storage
+ * operations.
+ */
+public interface Snapshotter {
+
+  /**
+   * Creates a snapshot from the contents of storage.
+   *
+   * @param stores stores to create a snapshot from.
+   * @return A snapshot that can be used to recover storage.
+   */
+  Snapshot from(StoreProvider stores);
+
+  /**
+   * Converts a snapshot into an equivalent linear stream of storage operations.
+   *
+   * @param snapshot A snapshot created by {@link #from(StoreProvider)}.
+   * @return a stream of operations representing the contents of the snapshot.
+   */
+  Stream<Op> asStream(Snapshot snapshot);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
index 7eaae89..4397c1e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
@@ -32,7 +32,7 @@ import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.config.types.TimeAmount;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
 import org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryImpl;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl.BackupConfig;
@@ -66,9 +66,9 @@ public class BackupModule extends PrivateModule {
   }
 
   private final Options options;
-  private final Class<? extends SnapshotStore<Snapshot>> snapshotStore;
+  private final Class<? extends Snapshotter> snapshotStore;
 
-  public BackupModule(Options options, Class<? extends SnapshotStore<Snapshot>> snapshotStore) {
+  public BackupModule(Options options, Class<? extends Snapshotter> snapshotStore) {
     this.options = options;
     this.snapshotStore = snapshotStore;
   }
@@ -78,13 +78,13 @@ public class BackupModule extends PrivateModule {
     Executor executor = AsyncUtil.singleThreadLoggingScheduledExecutor("StorageBackup-%d", LOG);
     bind(Executor.class).toInstance(executor);
 
-    TypeLiteral<SnapshotStore<Snapshot>> type = new TypeLiteral<SnapshotStore<Snapshot>>() { };
-    bind(type).annotatedWith(StorageBackupImpl.SnapshotDelegate.class).to(snapshotStore);
+    bind(Snapshotter.class).annotatedWith(StorageBackupImpl.SnapshotDelegate.class)
+        .to(snapshotStore);
 
-    bind(type).to(StorageBackupImpl.class);
+    bind(Snapshotter.class).to(StorageBackupImpl.class);
     bind(StorageBackup.class).to(StorageBackupImpl.class);
     bind(StorageBackupImpl.class).in(Singleton.class);
-    expose(type);
+    expose(Snapshotter.class);
     expose(StorageBackup.class);
 
     bind(new TypeLiteral<Function<Snapshot, TemporaryStorage>>() { })

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 3a62f02..79899a0 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -31,7 +31,7 @@ import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.common.base.Command;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -110,7 +110,7 @@ public interface Recovery {
     private final Function<Snapshot, TemporaryStorage> tempStorageFactory;
     private final AtomicReference<PendingRecovery> recovery;
     private final Storage primaryStorage;
-    private final DistributedSnapshotStore distributedStore;
+    private final SnapshotStore snapshotStore;
     private final Command shutDownNow;
 
     @Inject
@@ -118,14 +118,14 @@ public interface Recovery {
         File backupDir,
         Function<Snapshot, TemporaryStorage> tempStorageFactory,
         Storage primaryStorage,
-        DistributedSnapshotStore distributedStore,
+        SnapshotStore snapshotStore,
         Command shutDownNow) {
 
       this.backupDir = requireNonNull(backupDir);
       this.tempStorageFactory = requireNonNull(tempStorageFactory);
       this.recovery = Atomics.newReference();
       this.primaryStorage = requireNonNull(primaryStorage);
-      this.distributedStore = requireNonNull(distributedStore);
+      this.snapshotStore = requireNonNull(snapshotStore);
       this.shutDownNow = requireNonNull(shutDownNow);
     }
 
@@ -197,7 +197,7 @@ public interface Recovery {
       void commit() {
         primaryStorage.write((NoResult.Quiet) storeProvider -> {
           try {
-            distributedStore.snapshotWith(tempStorage.toSnapshot());
+            snapshotStore.snapshotWith(tempStorage.toSnapshot());
             shutDownNow.execute();
           } catch (CodingException e) {
             throw new IllegalStateException("Failed to encode snapshot.", e);

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
index 2d61678..1675893 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.inject.Qualifier;
@@ -42,8 +43,11 @@ import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.Stats;
 import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.gen.storage.Op;
 import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -69,7 +73,7 @@ public interface StorageBackup {
    */
   void backupNow();
 
-  class StorageBackupImpl implements StorageBackup, SnapshotStore<Snapshot> {
+  class StorageBackupImpl implements StorageBackup, Snapshotter {
     private static final Logger LOG = LoggerFactory.getLogger(StorageBackupImpl.class);
 
     private static final String FILE_PREFIX = "scheduler-backup-";
@@ -93,13 +97,14 @@ public interface StorageBackup {
     }
 
     /**
-     * Binding annotation that the underlying {@link SnapshotStore} must be bound with.
+     * Binding annotation that the underlying {@link Snapshotter} must be bound with.
      */
     @Qualifier
     @Target({FIELD, PARAMETER, METHOD}) @Retention(RUNTIME)
     @interface SnapshotDelegate { }
 
-    private final SnapshotStore<Snapshot> delegate;
+    private final Storage storage;
+    private final Snapshotter delegate;
     private final Clock clock;
     private final long backupIntervalMs;
     private volatile long lastBackupMs;
@@ -120,11 +125,13 @@ public interface StorageBackup {
 
     @Inject
     StorageBackupImpl(
-        @SnapshotDelegate SnapshotStore<Snapshot> delegate,
+        Storage storage,
+        @SnapshotDelegate Snapshotter delegate,
         Clock clock,
         BackupConfig config,
         Executor executor) {
 
+      this.storage = requireNonNull(storage);
       this.delegate = requireNonNull(delegate);
       this.clock = requireNonNull(clock);
       this.config = requireNonNull(config);
@@ -135,8 +142,8 @@ public interface StorageBackup {
     }
 
     @Override
-    public Snapshot createSnapshot() {
-      final Snapshot snapshot = delegate.createSnapshot();
+    public Snapshot from(StoreProvider stores) {
+      Snapshot snapshot = delegate.from(stores);
       if (clock.nowMillis() >= (lastBackupMs + backupIntervalMs)) {
         executor.execute(() -> save(snapshot));
       }
@@ -145,7 +152,7 @@ public interface StorageBackup {
 
     @Override
     public void backupNow() {
-      save(delegate.createSnapshot());
+      save(storage.write(delegate::from));
     }
 
     @VisibleForTesting
@@ -210,8 +217,8 @@ public interface StorageBackup {
     static final Function<File, String> FILE_NAME = File::getName;
 
     @Override
-    public void applySnapshot(Snapshot snapshot) {
-      delegate.applySnapshot(snapshot);
+    public Stream<Op> asStream(Snapshot snapshot) {
+      return delegate.asStream(snapshot);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 18296b0..0305d9d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -24,9 +24,11 @@ import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.durability.Loader;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
 import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
@@ -78,16 +80,15 @@ interface TemporaryStorage {
 
     @Override
     public TemporaryStorage apply(Snapshot snapshot) {
-      final Storage storage = MemStorageModule.newEmptyStorage();
-      final BuildInfo buildInfo = generateBuildInfo();
+      Storage storage = MemStorageModule.newEmptyStorage();
+      BuildInfo buildInfo = generateBuildInfo();
       FakeClock clock = new FakeClock();
       clock.setNowMillis(snapshot.getTimestamp());
-      final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl(
-          buildInfo,
-          clock,
-          storage,
-          thriftBackfill);
-      snapshotStore.applySnapshot(snapshot);
+      Snapshotter snapshotter = new SnapshotStoreImpl(buildInfo, clock);
+
+      storage.write((NoResult.Quiet) stores -> {
+        Loader.load(stores, thriftBackfill, snapshotter.asStream(snapshot).map(Edit::op));
+      });
 
       return new TemporaryStorage() {
         @Override
@@ -107,7 +108,7 @@ interface TemporaryStorage {
 
         @Override
         public Snapshot toSnapshot() {
-          return snapshotStore.createSnapshot();
+          return storage.write(snapshotter::from);
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
index 6a7c0ad..f1fdc27 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
@@ -14,23 +14,13 @@
 package org.apache.aurora.scheduler.storage.durability;
 
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
 
 import javax.inject.Inject;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.stats.SlidingStats;
-import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -43,12 +33,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
@@ -101,32 +85,17 @@ public class DurableStorage implements NonVolatileStorage {
     void log(Op op);
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(DurableStorage.class);
-
   private final Persistence persistence;
   private final Storage writeBehindStorage;
-  private final SchedulerStore.Mutable writeBehindSchedulerStore;
-  private final CronJobStore.Mutable writeBehindJobStore;
-  private final TaskStore.Mutable writeBehindTaskStore;
-  private final QuotaStore.Mutable writeBehindQuotaStore;
-  private final AttributeStore.Mutable writeBehindAttributeStore;
-  private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
   private final ReentrantLock writeLock;
   private final ThriftBackfill thriftBackfill;
 
-  private final WriteAheadStorage writeAheadStorage;
+  private final WriteRecorder writeRecorder;
 
-  // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
-  // recovering are controlled at this layer (they're all calls to Mutable store implementations).
-  // The more involved change is changing SnapshotStore to accept a Mutable store provider to
-  // avoid a call to Storage.write() when we replay a Snapshot.
-  private boolean recovered = false;
   private TransactionRecorder transaction = null;
 
   private final SlidingStats writerWaitStats = new SlidingStats("storage_write_lock_wait", "ns");
 
-  private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
-
   @Inject
   DurableStorage(
       Persistence persistence,
@@ -147,12 +116,6 @@ public class DurableStorage implements NonVolatileStorage {
     // we write directly to the writeBehind stores since we are replaying what's already persisted.
     // After that, all writes must succeed in Persistence before they may be considered successful.
     this.writeBehindStorage = requireNonNull(delegateStorage);
-    this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
-    this.writeBehindJobStore = requireNonNull(jobStore);
-    this.writeBehindTaskStore = requireNonNull(taskStore);
-    this.writeBehindQuotaStore = requireNonNull(quotaStore);
-    this.writeBehindAttributeStore = requireNonNull(attributeStore);
-    this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
     this.writeLock = requireNonNull(writeLock);
     this.thriftBackfill = requireNonNull(thriftBackfill);
     TransactionManager transactionManager = new TransactionManager() {
@@ -166,7 +129,7 @@ public class DurableStorage implements NonVolatileStorage {
         transaction.add(op);
       }
     };
-    this.writeAheadStorage = new WriteAheadStorage(
+    this.writeRecorder = new WriteRecorder(
         transactionManager,
         schedulerStore,
         jobStore,
@@ -174,81 +137,8 @@ public class DurableStorage implements NonVolatileStorage {
         quotaStore,
         attributeStore,
         jobUpdateStore,
-        LoggerFactory.getLogger(WriteAheadStorage.class),
+        LoggerFactory.getLogger(WriteRecorder.class),
         eventSink);
-
-    this.transactionReplayActions = buildTransactionReplayActions();
-  }
-
-  @VisibleForTesting
-  final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
-    return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
-        .put(
-            Op._Fields.SAVE_FRAMEWORK_ID,
-            op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
-        .put(Op._Fields.SAVE_CRON_JOB, op -> {
-          SaveCronJob cronJob = op.getSaveCronJob();
-          writeBehindJobStore.saveAcceptedJob(
-              thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig()));
-        })
-        .put(
-            Op._Fields.REMOVE_JOB,
-            op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
-        .put(
-            Op._Fields.SAVE_TASKS,
-            op -> writeBehindTaskStore.saveTasks(
-                thriftBackfill.backfillTasks(op.getSaveTasks().getTasks())))
-        .put(
-            Op._Fields.REMOVE_TASKS,
-            op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
-        .put(Op._Fields.SAVE_QUOTA, op -> {
-          SaveQuota saveQuota = op.getSaveQuota();
-          writeBehindQuotaStore.saveQuota(
-              saveQuota.getRole(),
-              ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
-        })
-        .put(
-            Op._Fields.REMOVE_QUOTA,
-            op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
-        .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
-          HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
-          // Prior to commit 5cf760b, the store would persist maintenance mode changes for
-          // unknown hosts.  5cf760b began rejecting these, but the storage may still
-          // contain entries with a null slave ID.
-          if (attributes.isSetSlaveId()) {
-            writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
-          } else {
-            LOG.info("Dropping host attributes with no agent ID: " + attributes);
-          }
-        })
-        .put(
-            Op._Fields.SAVE_LOCK, // TODO(jly): Deprecated, remove in 0.21. See AURORA-1959.
-            op -> { /* no-op */ })
-        .put(
-            Op._Fields.REMOVE_LOCK, // TODO(jly): Deprecated, remove in 0.21. See AURORA-1959.
-            op -> { /* no-op */ })
-        .put(Op._Fields.SAVE_JOB_UPDATE, op ->
-          writeBehindJobUpdateStore.saveJobUpdate(
-              thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
-        .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
-          SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
-          writeBehindJobUpdateStore.saveJobUpdateEvent(
-              IJobUpdateKey.build(event.getKey()),
-              IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
-        })
-        .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
-          SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
-          writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
-              IJobUpdateKey.build(event.getKey()),
-              IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
-        })
-        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
-          LOG.info("Dropping prune operation.  Updates will be pruned later.");
-        })
-        .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
-          writeBehindJobUpdateStore.removeJobUpdates(
-              IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
-        .build();
   }
 
   @Override
@@ -260,18 +150,18 @@ public class DurableStorage implements NonVolatileStorage {
 
   @Override
   @Timed("scheduler_storage_start")
-  public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
-    write((NoResult.Quiet) unused -> {
-      // Must have the underlying storage started so we can query it.
-      // We replay these entries in the forwarded storage system's transactions but not ours - we
-      // do not want to re-record these ops.
-      recover();
-      recovered = true;
+  public void start(final MutateWork.NoResult.Quiet initializationLogic) {
+    writeLock.lock();
+    try {
+      // We recover directly into the forwarded system to avoid persisting replayed operations.
+      writeBehindStorage.write((NoResult.Quiet) this::recover);
 
       // Now that we're recovered we should persist any mutations done in initializationLogic, so
       // run it in one of our transactions.
       write(initializationLogic);
-    });
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override
@@ -280,9 +170,9 @@ public class DurableStorage implements NonVolatileStorage {
   }
 
   @Timed("scheduler_storage_recover")
-  void recover() throws RecoveryFailedException {
+  void recover(MutableStoreProvider stores) throws RecoveryFailedException {
     try {
-      persistence.recover().forEach(DurableStorage.this::replayOp);
+      Loader.load(stores, thriftBackfill, persistence.recover());
     } catch (PersistenceException e) {
       throw new RecoveryFailedException(e);
     }
@@ -294,28 +184,19 @@ public class DurableStorage implements NonVolatileStorage {
     }
   }
 
-  private void replayOp(Op op) {
-    Op._Fields opField = op.getSetField();
-    if (!transactionReplayActions.containsKey(opField)) {
-      throw new IllegalStateException("Unknown transaction op: " + opField);
-    }
-
-    transactionReplayActions.get(opField).accept(op);
-  }
-
   private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
       throws StorageException, E {
 
     // The transaction has already been set up so we just need to delegate with our store provider
     // so any mutations may be persisted.
     if (transaction != null) {
-      return work.apply(writeAheadStorage);
+      return work.apply(writeRecorder);
     }
 
     transaction = new TransactionRecorder();
     try {
       return writeBehindStorage.write(unused -> {
-        T result = work.apply(writeAheadStorage);
+        T result = work.apply(writeRecorder);
         List<Op> ops = transaction.getOps();
         if (!ops.isEmpty()) {
           try {
@@ -337,12 +218,6 @@ public class DurableStorage implements NonVolatileStorage {
     writeLock.lock();
     try {
       writerWaitStats.accumulate(System.nanoTime() - waitStart);
-      // We don't want to persist when recovering, we just want to update the underlying
-      // store - so pass mutations straight through to the underlying storage.
-      if (!recovered) {
-        return writeBehindStorage.write(work);
-      }
-
       return doInTransaction(work);
     } finally {
       writeLock.unlock();

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java
new file mode 100644
index 0000000..10864f1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class Loader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Loader.class);
+
+  private Loader() {
+    // Utility class.
+  }
+
+  /**
+   * Loads a sequence of storage operations into the provided stores, applying backfills.
+   *
+   * @param stores Stores to populate.
+   * @param backfill Backfill mechanism to use.
+   * @param edits Edits to apply.
+   */
+  public static void load(
+      MutableStoreProvider stores,
+      ThriftBackfill backfill,
+      Stream<Edit> edits) {
+
+    edits.forEach(edit -> load(stores, backfill, edit));
+  }
+
+  private static void load(MutableStoreProvider stores, ThriftBackfill backfill, Edit edit) {
+    if (edit.isDeleteAll()) {
+      LOG.info("Resetting storage");
+      stores.getCronJobStore().deleteJobs();
+      stores.getUnsafeTaskStore().deleteAllTasks();
+      stores.getQuotaStore().deleteQuotas();
+      stores.getAttributeStore().deleteHostAttributes();
+      stores.getJobUpdateStore().deleteAllUpdates();
+      return;
+    }
+
+    Op op = edit.getOp();
+    switch (op.getSetField()) {
+      case SAVE_FRAMEWORK_ID:
+        stores.getSchedulerStore().saveFrameworkId(op.getSaveFrameworkId().getId());
+        break;
+
+      case SAVE_CRON_JOB:
+        stores.getCronJobStore().saveAcceptedJob(
+            backfill.backfillJobConfiguration(op.getSaveCronJob().getJobConfig()));
+        break;
+
+      case REMOVE_JOB:
+        stores.getCronJobStore().removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
+        break;
+
+      case REMOVE_LOCK:
+      case SAVE_LOCK:
+        // TODO(jly): Deprecated, remove in 0.21. See AURORA-1959.
+        break;
+
+      case SAVE_TASKS:
+        stores.getUnsafeTaskStore().saveTasks(backfill.backfillTasks(op.getSaveTasks().getTasks()));
+        break;
+
+      case REMOVE_TASKS:
+        stores.getUnsafeTaskStore().deleteTasks(op.getRemoveTasks().getTaskIds());
+        break;
+
+      case SAVE_QUOTA:
+        SaveQuota saveQuota = op.getSaveQuota();
+        stores.getQuotaStore().saveQuota(
+            saveQuota.getRole(),
+            ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
+        break;
+
+      case REMOVE_QUOTA:
+        stores.getQuotaStore().removeQuota(op.getRemoveQuota().getRole());
+        break;
+
+      case SAVE_HOST_ATTRIBUTES:
+        HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
+        // Prior to commit 5cf760b, the store would persist maintenance mode changes for
+        // unknown hosts.  5cf760b began rejecting these, but the storage may still
+        // contain entries with a null slave ID.
+        if (attributes.isSetSlaveId()) {
+          stores.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes));
+        } else {
+          LOG.info("Dropping host attributes with no agent ID: " + attributes);
+        }
+        break;
+
+      case SAVE_JOB_UPDATE:
+        stores.getJobUpdateStore().saveJobUpdate(
+            backfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate()));
+        break;
+
+      case SAVE_JOB_UPDATE_EVENT:
+        SaveJobUpdateEvent jobEvent = op.getSaveJobUpdateEvent();
+        stores.getJobUpdateStore().saveJobUpdateEvent(
+            IJobUpdateKey.build(jobEvent.getKey()),
+            IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
+        break;
+
+      case SAVE_JOB_INSTANCE_UPDATE_EVENT:
+        SaveJobInstanceUpdateEvent instanceEvent = op.getSaveJobInstanceUpdateEvent();
+        stores.getJobUpdateStore().saveJobInstanceUpdateEvent(
+            IJobUpdateKey.build(instanceEvent.getKey()),
+            IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
+        break;
+
+      case PRUNE_JOB_UPDATE_HISTORY:
+        LOG.info("Dropping prune operation.  Updates will be pruned later.");
+        break;
+
+      case REMOVE_JOB_UPDATE:
+        stores.getJobUpdateStore().removeJobUpdates(
+            IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys()));
+        break;
+
+      default:
+        throw new IllegalArgumentException("Unrecognized op type " + op.getSetField());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
index 9eb862c..4476d90 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
@@ -13,10 +13,15 @@
  */
 package org.apache.aurora.scheduler.storage.durability;
 
+import java.util.Objects;
 import java.util.stream.Stream;
 
+import javax.annotation.Nullable;
+
 import org.apache.aurora.gen.storage.Op;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Persistence layer for storage operations.
  */
@@ -31,10 +36,10 @@ public interface Persistence {
   /**
    * Recovers previously-persisted records.
    *
-   * @return All persisted records.
+   * @return All edits to apply.
    * @throws PersistenceException If recovery failed.
    */
-  Stream<Op> recover() throws PersistenceException;
+  Stream<Edit> recover() throws PersistenceException;
 
   /**
    * Saves new records.  No records may be considered durably saved until this method returns
@@ -46,6 +51,53 @@ public interface Persistence {
   void persist(Stream<Op> records) throws PersistenceException;
 
   /**
+   * An edit to apply when recovering from persistence.
+   */
+  class Edit {
+    @Nullable private final Op op;
+
+    private Edit(@Nullable Op op) {
+      this.op = op;
+    }
+
+    public static Edit op(Op op) {
+      return new Edit(requireNonNull(op));
+    }
+
+    public static Edit deleteAll() {
+      return new Edit(null);
+    }
+
+    public boolean isDeleteAll() {
+      return op == null;
+    }
+
+    public Op getOp() {
+      return requireNonNull(op);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof Edit)) {
+        return false;
+      }
+
+      Edit other = (Edit) obj;
+      return Objects.equals(op, other.op);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(op);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toString(op);
+    }
+  }
+
+  /**
    * Thrown when a persistence operation fails.
    */
   class PersistenceException extends Exception {

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
deleted file mode 100644
index 667db06..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.durability;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.slf4j.Logger;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
- * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
- * stores.
- */
-public class WriteAheadStorage implements
-    MutableStoreProvider,
-    SchedulerStore.Mutable,
-    CronJobStore.Mutable,
-    TaskStore.Mutable,
-    QuotaStore.Mutable,
-    AttributeStore.Mutable,
-    JobUpdateStore.Mutable {
-
-  private final TransactionManager transactionManager;
-  private final SchedulerStore.Mutable schedulerStore;
-  private final CronJobStore.Mutable jobStore;
-  private final TaskStore.Mutable taskStore;
-  private final QuotaStore.Mutable quotaStore;
-  private final AttributeStore.Mutable attributeStore;
-  private final JobUpdateStore.Mutable jobUpdateStore;
-  private final Logger log;
-  private final EventSink eventSink;
-
-  /**
-   * Creates a new write-ahead storage that delegates to the providing default stores.
-   *
-   * @param transactionManager External controller for transaction operations.
-   * @param schedulerStore Delegate.
-   * @param jobStore       Delegate.
-   * @param taskStore      Delegate.
-   * @param quotaStore     Delegate.
-   * @param attributeStore Delegate.
-   * @param jobUpdateStore Delegate.
-   */
-  public WriteAheadStorage(
-      TransactionManager transactionManager,
-      SchedulerStore.Mutable schedulerStore,
-      CronJobStore.Mutable jobStore,
-      TaskStore.Mutable taskStore,
-      QuotaStore.Mutable quotaStore,
-      AttributeStore.Mutable attributeStore,
-      JobUpdateStore.Mutable jobUpdateStore,
-      Logger log,
-      EventSink eventSink) {
-
-    this.transactionManager = requireNonNull(transactionManager);
-    this.schedulerStore = requireNonNull(schedulerStore);
-    this.jobStore = requireNonNull(jobStore);
-    this.taskStore = requireNonNull(taskStore);
-    this.quotaStore = requireNonNull(quotaStore);
-    this.attributeStore = requireNonNull(attributeStore);
-    this.jobUpdateStore = requireNonNull(jobUpdateStore);
-    this.log = requireNonNull(log);
-    this.eventSink = requireNonNull(eventSink);
-  }
-
-  private void write(Op op) {
-    Preconditions.checkState(
-        transactionManager.hasActiveTransaction(),
-        "Mutating operations must be within a transaction.");
-    transactionManager.log(op);
-  }
-
-  @Override
-  public void saveFrameworkId(final String frameworkId) {
-    requireNonNull(frameworkId);
-
-    write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
-    schedulerStore.saveFrameworkId(frameworkId);
-  }
-
-  @Override
-  public void deleteTasks(final Set<String> taskIds) {
-    requireNonNull(taskIds);
-
-    write(Op.removeTasks(new RemoveTasks(taskIds)));
-    taskStore.deleteTasks(taskIds);
-  }
-
-  @Override
-  public void saveTasks(final Set<IScheduledTask> newTasks) {
-    requireNonNull(newTasks);
-
-    write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
-    taskStore.saveTasks(newTasks);
-  }
-
-  @Override
-  public Optional<IScheduledTask> mutateTask(
-      String taskId,
-      Function<IScheduledTask, IScheduledTask> mutator) {
-
-    Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
-    log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
-    write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
-    return mutated;
-  }
-
-  @Override
-  public void saveQuota(final String role, final IResourceAggregate quota) {
-    requireNonNull(role);
-    requireNonNull(quota);
-
-    write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
-    quotaStore.saveQuota(role, quota);
-  }
-
-  @Override
-  public boolean saveHostAttributes(final IHostAttributes attrs) {
-    requireNonNull(attrs);
-
-    boolean changed = attributeStore.saveHostAttributes(attrs);
-    if (changed) {
-      write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
-      eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
-    }
-    return changed;
-  }
-
-  @Override
-  public void removeJob(final IJobKey jobKey) {
-    requireNonNull(jobKey);
-
-    write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
-    jobStore.removeJob(jobKey);
-  }
-
-  @Override
-  public void saveAcceptedJob(final IJobConfiguration jobConfig) {
-    requireNonNull(jobConfig);
-
-    write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
-    jobStore.saveAcceptedJob(jobConfig);
-  }
-
-  @Override
-  public void removeQuota(final String role) {
-    requireNonNull(role);
-
-    write(Op.removeQuota(new RemoveQuota(role)));
-    quotaStore.removeQuota(role);
-  }
-
-  @Override
-  public void saveJobUpdate(IJobUpdate update) {
-    requireNonNull(update);
-
-    write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
-    jobUpdateStore.saveJobUpdate(update);
-  }
-
-  @Override
-  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
-    requireNonNull(key);
-    requireNonNull(event);
-
-    write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
-    jobUpdateStore.saveJobUpdateEvent(key, event);
-  }
-
-  @Override
-  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
-    requireNonNull(key);
-    requireNonNull(event);
-
-    write(Op.saveJobInstanceUpdateEvent(
-        new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
-    jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
-  }
-
-  @Override
-  public void removeJobUpdates(Set<IJobUpdateKey> keys) {
-    requireNonNull(keys);
-
-    // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
-    // read it.  JobUpdates are only removed implicitly when a snapshot is taken.
-    jobUpdateStore.removeJobUpdates(keys);
-  }
-
-  @Override
-  public void deleteAllTasks() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteHostAttributes() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteJobs() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteQuotas() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteAllUpdates() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public SchedulerStore.Mutable getSchedulerStore() {
-    return this;
-  }
-
-  @Override
-  public CronJobStore.Mutable getCronJobStore() {
-    return this;
-  }
-
-  @Override
-  public TaskStore.Mutable getUnsafeTaskStore() {
-    return this;
-  }
-
-  @Override
-  public QuotaStore.Mutable getQuotaStore() {
-    return this;
-  }
-
-  @Override
-  public AttributeStore.Mutable getAttributeStore() {
-    return this;
-  }
-
-  @Override
-  public TaskStore getTaskStore() {
-    return this;
-  }
-
-  @Override
-  public JobUpdateStore.Mutable getJobUpdateStore() {
-    return this;
-  }
-
-  @Override
-  public Optional<String> fetchFrameworkId() {
-    return this.schedulerStore.fetchFrameworkId();
-  }
-
-  @Override
-  public Iterable<IJobConfiguration> fetchJobs() {
-    return this.jobStore.fetchJobs();
-  }
-
-  @Override
-  public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
-    return this.jobStore.fetchJob(jobKey);
-  }
-
-  @Override
-  public Optional<IScheduledTask> fetchTask(String taskId) {
-    return this.taskStore.fetchTask(taskId);
-  }
-
-  @Override
-  public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
-    return this.taskStore.fetchTasks(query);
-  }
-
-  @Override
-  public Set<IJobKey> getJobKeys() {
-    return this.taskStore.getJobKeys();
-  }
-
-  @Override
-  public Optional<IResourceAggregate> fetchQuota(String role) {
-    return this.quotaStore.fetchQuota(role);
-  }
-
-  @Override
-  public Map<String, IResourceAggregate> fetchQuotas() {
-    return this.quotaStore.fetchQuotas();
-  }
-
-  @Override
-  public Optional<IHostAttributes> getHostAttributes(String host) {
-    return this.attributeStore.getHostAttributes(host);
-  }
-
-  @Override
-  public Set<IHostAttributes> getHostAttributes() {
-    return this.attributeStore.getHostAttributes();
-  }
-
-  @Override
-  public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
-    return this.jobUpdateStore.fetchJobUpdates(query);
-  }
-
-  @Override
-  public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
-    return this.jobUpdateStore.fetchJobUpdate(key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
new file mode 100644
index 0000000..5ae834a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.slf4j.Logger;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
+ * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
+ * stores.
+ */
+public class WriteRecorder implements
+    MutableStoreProvider,
+    SchedulerStore.Mutable,
+    CronJobStore.Mutable,
+    TaskStore.Mutable,
+    QuotaStore.Mutable,
+    AttributeStore.Mutable,
+    JobUpdateStore.Mutable {
+
+  private final TransactionManager transactionManager;
+  private final SchedulerStore.Mutable schedulerStore;
+  private final CronJobStore.Mutable jobStore;
+  private final TaskStore.Mutable taskStore;
+  private final QuotaStore.Mutable quotaStore;
+  private final AttributeStore.Mutable attributeStore;
+  private final JobUpdateStore.Mutable jobUpdateStore;
+  private final Logger log;
+  private final EventSink eventSink;
+
+  /**
+   * Creates a new write-ahead storage that delegates to the providing default stores.
+   *
+   * @param transactionManager External controller for transaction operations.
+   * @param schedulerStore Delegate.
+   * @param jobStore       Delegate.
+   * @param taskStore      Delegate.
+   * @param quotaStore     Delegate.
+   * @param attributeStore Delegate.
+   * @param jobUpdateStore Delegate.
+   */
+  public WriteRecorder(
+      TransactionManager transactionManager,
+      SchedulerStore.Mutable schedulerStore,
+      CronJobStore.Mutable jobStore,
+      TaskStore.Mutable taskStore,
+      QuotaStore.Mutable quotaStore,
+      AttributeStore.Mutable attributeStore,
+      JobUpdateStore.Mutable jobUpdateStore,
+      Logger log,
+      EventSink eventSink) {
+
+    this.transactionManager = requireNonNull(transactionManager);
+    this.schedulerStore = requireNonNull(schedulerStore);
+    this.jobStore = requireNonNull(jobStore);
+    this.taskStore = requireNonNull(taskStore);
+    this.quotaStore = requireNonNull(quotaStore);
+    this.attributeStore = requireNonNull(attributeStore);
+    this.jobUpdateStore = requireNonNull(jobUpdateStore);
+    this.log = requireNonNull(log);
+    this.eventSink = requireNonNull(eventSink);
+  }
+
+  private void write(Op op) {
+    Preconditions.checkState(
+        transactionManager.hasActiveTransaction(),
+        "Mutating operations must be within a transaction.");
+    transactionManager.log(op);
+  }
+
+  @Override
+  public void saveFrameworkId(final String frameworkId) {
+    requireNonNull(frameworkId);
+
+    write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+    schedulerStore.saveFrameworkId(frameworkId);
+  }
+
+  @Override
+  public void deleteTasks(final Set<String> taskIds) {
+    requireNonNull(taskIds);
+
+    write(Op.removeTasks(new RemoveTasks(taskIds)));
+    taskStore.deleteTasks(taskIds);
+  }
+
+  @Override
+  public void saveTasks(final Set<IScheduledTask> newTasks) {
+    requireNonNull(newTasks);
+
+    write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+    taskStore.saveTasks(newTasks);
+  }
+
+  @Override
+  public Optional<IScheduledTask> mutateTask(
+      String taskId,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
+    log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
+    write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+    return mutated;
+  }
+
+  @Override
+  public void saveQuota(final String role, final IResourceAggregate quota) {
+    requireNonNull(role);
+    requireNonNull(quota);
+
+    write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+    quotaStore.saveQuota(role, quota);
+  }
+
+  @Override
+  public boolean saveHostAttributes(final IHostAttributes attrs) {
+    requireNonNull(attrs);
+
+    boolean changed = attributeStore.saveHostAttributes(attrs);
+    if (changed) {
+      write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
+      eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
+    }
+    return changed;
+  }
+
+  @Override
+  public void removeJob(final IJobKey jobKey) {
+    requireNonNull(jobKey);
+
+    write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+    jobStore.removeJob(jobKey);
+  }
+
+  @Override
+  public void saveAcceptedJob(final IJobConfiguration jobConfig) {
+    requireNonNull(jobConfig);
+
+    write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+    jobStore.saveAcceptedJob(jobConfig);
+  }
+
+  @Override
+  public void removeQuota(final String role) {
+    requireNonNull(role);
+
+    write(Op.removeQuota(new RemoveQuota(role)));
+    quotaStore.removeQuota(role);
+  }
+
+  @Override
+  public void saveJobUpdate(IJobUpdate update) {
+    requireNonNull(update);
+
+    write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+    jobUpdateStore.saveJobUpdate(update);
+  }
+
+  @Override
+  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+    requireNonNull(key);
+    requireNonNull(event);
+
+    write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
+    jobUpdateStore.saveJobUpdateEvent(key, event);
+  }
+
+  @Override
+  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
+    requireNonNull(key);
+    requireNonNull(event);
+
+    write(Op.saveJobInstanceUpdateEvent(
+        new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
+    jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
+  }
+
+  @Override
+  public void removeJobUpdates(Set<IJobUpdateKey> keys) {
+    requireNonNull(keys);
+
+    // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
+    // read it.  JobUpdates are only removed implicitly when a snapshot is taken.
+    jobUpdateStore.removeJobUpdates(keys);
+  }
+
+  @Override
+  public void deleteAllTasks() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteHostAttributes() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteJobs() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteQuotas() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteAllUpdates() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public SchedulerStore.Mutable getSchedulerStore() {
+    return this;
+  }
+
+  @Override
+  public CronJobStore.Mutable getCronJobStore() {
+    return this;
+  }
+
+  @Override
+  public TaskStore.Mutable getUnsafeTaskStore() {
+    return this;
+  }
+
+  @Override
+  public QuotaStore.Mutable getQuotaStore() {
+    return this;
+  }
+
+  @Override
+  public AttributeStore.Mutable getAttributeStore() {
+    return this;
+  }
+
+  @Override
+  public TaskStore getTaskStore() {
+    return this;
+  }
+
+  @Override
+  public JobUpdateStore.Mutable getJobUpdateStore() {
+    return this;
+  }
+
+  @Override
+  public Optional<String> fetchFrameworkId() {
+    return this.schedulerStore.fetchFrameworkId();
+  }
+
+  @Override
+  public Iterable<IJobConfiguration> fetchJobs() {
+    return this.jobStore.fetchJobs();
+  }
+
+  @Override
+  public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
+    return this.jobStore.fetchJob(jobKey);
+  }
+
+  @Override
+  public Optional<IScheduledTask> fetchTask(String taskId) {
+    return this.taskStore.fetchTask(taskId);
+  }
+
+  @Override
+  public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+    return this.taskStore.fetchTasks(query);
+  }
+
+  @Override
+  public Set<IJobKey> getJobKeys() {
+    return this.taskStore.getJobKeys();
+  }
+
+  @Override
+  public Optional<IResourceAggregate> fetchQuota(String role) {
+    return this.quotaStore.fetchQuota(role);
+  }
+
+  @Override
+  public Map<String, IResourceAggregate> fetchQuotas() {
+    return this.quotaStore.fetchQuotas();
+  }
+
+  @Override
+  public Optional<IHostAttributes> getHostAttributes(String host) {
+    return this.attributeStore.getHostAttributes(host);
+  }
+
+  @Override
+  public Set<IHostAttributes> getHostAttributes() {
+    return this.attributeStore.getHostAttributes();
+  }
+
+  @Override
+  public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
+    return this.jobUpdateStore.fetchJobUpdates(query);
+  }
+
+  @Override
+  public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
+    return this.jobUpdateStore.fetchJobUpdate(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
index e70e605..8ca3169 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
@@ -16,31 +16,19 @@ package org.apache.aurora.scheduler.storage.log;
 import java.io.IOException;
 import java.util.Date;
 import java.util.Iterator;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import javax.inject.Inject;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
-
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.common.application.ShutdownRegistry;
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Op;
 import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
 import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.Snapshotter;
 import org.apache.aurora.scheduler.storage.durability.Persistence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,42 +38,18 @@ import static java.util.Objects.requireNonNull;
 /**
  * Persistence layer that uses a replicated log.
  */
-class LogPersistence implements Persistence, DistributedSnapshotStore {
+class LogPersistence implements Persistence {
 
   private static final Logger LOG = LoggerFactory.getLogger(LogPersistence.class);
 
   private final LogManager logManager;
-  private final SnapshotStore<Snapshot> snapshotStore;
-  private final SchedulingService schedulingService;
-  private final Amount<Long, Time> snapshotInterval;
+  private final Snapshotter snapshotter;
   private StreamManager streamManager;
 
   @Inject
-  LogPersistence(
-      Settings settings,
-      LogManager logManager,
-      SnapshotStore<Snapshot> snapshotStore,
-      ShutdownRegistry shutdownRegistry) {
-
-    this(new ScheduledExecutorSchedulingService(
-            shutdownRegistry,
-            settings.getShutdownGracePeriod()),
-        settings.getSnapshotInterval(),
-        logManager,
-        snapshotStore);
-  }
-
-  @VisibleForTesting
-  LogPersistence(
-      SchedulingService schedulingService,
-      Amount<Long, Time> snapshotInterval,
-      LogManager logManager,
-      SnapshotStore<Snapshot> snapshotStore) {
-
-    this.schedulingService = requireNonNull(schedulingService);
-    this.snapshotInterval = requireNonNull(snapshotInterval);
+  LogPersistence(LogManager logManager, Snapshotter snapshotter) {
     this.logManager = requireNonNull(logManager);
-    this.snapshotStore = requireNonNull(snapshotStore);
+    this.snapshotter = requireNonNull(snapshotter);
   }
 
   @Override
@@ -98,6 +62,15 @@ class LogPersistence implements Persistence, DistributedSnapshotStore {
     }
   }
 
+  /**
+   * Saves a snapshot to the log stream.
+   *
+   * @param snapshot Snapshot to save.
+   */
+  void persist(Snapshot snapshot) {
+    streamManager.snapshot(snapshot);
+  }
+
   @Override
   public void persist(Stream<Op> mutations) throws PersistenceException {
     try {
@@ -108,9 +81,7 @@ class LogPersistence implements Persistence, DistributedSnapshotStore {
   }
 
   @Override
-  public Stream<Op> recover() throws PersistenceException {
-    scheduleSnapshots();
-
+  public Stream<Edit> recover() throws PersistenceException {
     try {
       Iterator<LogEntry> entries = streamManager.readFromBeginning();
       Iterable<LogEntry> iterableEntries = () -> entries;
@@ -118,139 +89,26 @@ class LogPersistence implements Persistence, DistributedSnapshotStore {
 
       return entryStream
           .filter(entry -> entry.getSetField() != LogEntry._Fields.NOOP)
-          .filter(entry -> {
-            if (entry.getSetField() == LogEntry._Fields.SNAPSHOT) {
-              Snapshot snapshot = entry.getSnapshot();
-              LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
-              snapshotStore.applySnapshot(snapshot);
-              return false;
-            }
-            return true;
-          })
-          .peek(entry -> {
-            if (entry.getSetField() != LogEntry._Fields.TRANSACTION) {
-              throw new IllegalStateException("Unknown log entry type: " + entry.getSetField());
+          .flatMap(entry -> {
+            switch (entry.getSetField()) {
+              case SNAPSHOT:
+                Snapshot snapshot = entry.getSnapshot();
+                LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
+                return Stream.concat(
+                    Stream.of(Edit.deleteAll()),
+                    snapshotter.asStream(snapshot)
+                        .map(Edit::op));
+
+              case TRANSACTION:
+                return entry.getTransaction().getOps().stream()
+                    .map(Edit::op);
+
+              default:
+                throw new IllegalStateException("Unknown log entry type: " + entry.getSetField());
             }
-          })
-          .flatMap(entry -> entry.getTransaction().getOps().stream());
+          });
     } catch (CodingException | InvalidPositionException | StreamAccessException e) {
       throw new PersistenceException(e);
     }
   }
-
-  private void scheduleSnapshots() {
-    if (snapshotInterval.getValue() > 0) {
-      schedulingService.doEvery(snapshotInterval, () -> {
-        try {
-          snapshot();
-        } catch (StorageException e) {
-          if (e.getCause() == null) {
-            LOG.warn("StorageException when attempting to snapshot.", e);
-          } else {
-            LOG.warn(e.getMessage(), e.getCause());
-          }
-        }
-      });
-    }
-  }
-
-  @Override
-  public void snapshot() throws StorageException {
-    try {
-      doSnapshot();
-    } catch (CodingException e) {
-      throw new StorageException("Failed to encode a snapshot", e);
-    } catch (InvalidPositionException e) {
-      throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
-    } catch (StreamAccessException e) {
-      throw new StorageException("Failed to create a snapshot", e);
-    }
-  }
-
-  @Timed("scheduler_log_snapshot_persist")
-  @Override
-  public void snapshotWith(Snapshot snapshot)
-      throws CodingException, InvalidPositionException, StreamAccessException {
-
-    streamManager.snapshot(snapshot);
-  }
-
-  /**
-   * Forces a snapshot of the storage state.
-   *
-   * @throws CodingException If there is a problem encoding the snapshot.
-   * @throws InvalidPositionException If the log stream cursor is invalid.
-   * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
-   */
-  @Timed("scheduler_log_snapshot")
-  void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
-    LOG.info("Creating snapshot.");
-    Snapshot snapshot = snapshotStore.createSnapshot();
-    snapshotWith(snapshot);
-    LOG.info("Snapshot complete."
-        + " host attrs: " + snapshot.getHostAttributesSize()
-        + ", cron jobs: " + snapshot.getCronJobsSize()
-        + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
-        + ", tasks: " + snapshot.getTasksSize()
-        + ", updates: " + snapshot.getJobUpdateDetailsSize());
-  }
-
-  /**
-   * A service that can schedule an action to be executed periodically.
-   */
-  @VisibleForTesting
-  interface SchedulingService {
-
-    /**
-     * Schedules an action to execute periodically.
-     *
-     * @param interval The time period to wait until running the {@code action} again.
-     * @param action The action to execute periodically.
-     */
-    void doEvery(Amount<Long, Time> interval, Runnable action);
-  }
-
-  private static class ScheduledExecutorSchedulingService implements SchedulingService {
-    private final ScheduledExecutorService scheduledExecutor;
-
-    ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
-                                       Amount<Long, Time> shutdownGracePeriod) {
-      scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
-      shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination(
-          scheduledExecutor,
-          shutdownGracePeriod.getValue(),
-          shutdownGracePeriod.getUnit().getTimeUnit()));
-    }
-
-    @Override
-    public void doEvery(Amount<Long, Time> interval, Runnable action) {
-      requireNonNull(interval);
-      requireNonNull(action);
-
-      long delay = interval.getValue();
-      TimeUnit timeUnit = interval.getUnit().getTimeUnit();
-      scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
-    }
-  }
-
-  /**
-   * Configuration settings for log persistence.
-   */
-  public static class Settings {
-    private final Amount<Long, Time> shutdownGracePeriod;
-    private final Amount<Long, Time> snapshotInterval;
-
-    Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
-      this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
-      this.snapshotInterval = requireNonNull(snapshotInterval);
-    }
-
-    public Amount<Long, Time> getShutdownGracePeriod() {
-      return shutdownGracePeriod;
-    }
-
-    public Amount<Long, Time> getSnapshotInterval() {
-      return snapshotInterval;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index 75ec42a..671593c 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -19,6 +19,7 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
+import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
@@ -26,33 +27,28 @@ import com.google.inject.assistedinject.FactoryModuleBuilder;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.config.types.DataAmount;
 import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.durability.DurableStorage;
 import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
+import org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
 import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.LogPersistence.Settings;
-
-import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
-import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
-import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotService.Settings;
 
 /**
  * Bindings for scheduler distributed log based storage.
  */
-public class LogStorageModule extends PrivateModule {
+public class LogStorageModule extends AbstractModule {
 
   @Parameters(separators = "=")
   public static class Options {
-    @Parameter(names = "-dlog_shutdown_grace_period",
-        description = "Specifies the maximum time to wait for scheduled checkpoint and snapshot "
-            + "actions to complete before forcibly shutting down.")
-    public TimeAmount shutdownGracePeriod = new TimeAmount(2, Time.SECONDS);
-
     @Parameter(names = "-dlog_snapshot_interval",
         description = "Specifies the frequency at which snapshots of local storage are taken and "
             + "written to the log.")
@@ -73,34 +69,42 @@ public class LogStorageModule extends PrivateModule {
 
   @Override
   protected void configure() {
-    bind(Settings.class)
-        .toInstance(new Settings(options.shutdownGracePeriod, options.snapshotInterval));
-
-    bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
-        .toInstance(options.maxLogEntrySize);
-    bind(LogManager.class).in(Singleton.class);
-    bind(DurableStorage.class).in(Singleton.class);
-
-    install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
-    bind(LogPersistence.class).in(Singleton.class);
-    bind(Persistence.class).to(LogPersistence.class);
-    bind(DistributedSnapshotStore.class).to(LogPersistence.class);
-    expose(Persistence.class);
-    expose(Storage.class);
-    expose(NonVolatileStorage.class);
-    expose(DistributedSnapshotStore.class);
-
-    bind(EntrySerializer.class).to(EntrySerializerImpl.class);
-    // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
-    // versus a faster error-detection checksum like CRC32 for large Snapshots.
-    @SuppressWarnings("deprecation")
-    HashFunction hashFunction = Hashing.md5();
-    bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction);
-
-    bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class);
-
-    install(new FactoryModuleBuilder()
-        .implement(StreamManager.class, StreamManagerImpl.class)
-        .build(StreamManagerFactory.class));
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(Settings.class).toInstance(new Settings(options.snapshotInterval));
+
+        bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
+            .toInstance(options.maxLogEntrySize);
+        bind(LogManager.class).in(Singleton.class);
+        bind(DurableStorage.class).in(Singleton.class);
+
+        install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
+        bind(LogPersistence.class).in(Singleton.class);
+        bind(Persistence.class).to(LogPersistence.class);
+        bind(SnapshotStore.class).to(SnapshotService.class);
+        bind(SnapshotService.class).in(Singleton.class);
+        expose(SnapshotService.class);
+        expose(Persistence.class);
+        expose(Storage.class);
+        expose(NonVolatileStorage.class);
+        expose(SnapshotStore.class);
+
+        bind(EntrySerializer.class).to(EntrySerializerImpl.class);
+        // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
+        // versus a faster error-detection checksum like CRC32 for large Snapshots.
+        @SuppressWarnings("deprecation")
+        HashFunction hashFunction = Hashing.md5();
+        bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction);
+
+        bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class);
+
+        install(new FactoryModuleBuilder()
+            .implement(StreamManager.class, StreamManagerImpl.class)
+            .build(StreamManagerFactory.class));
+      }
+    });
+
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SnapshotService.class);
   }
 }