You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/03 03:59:12 UTC

[3/4] aurora git commit: Extract a storage Persistence layer

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
deleted file mode 100644
index 07b4bdb..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.common.application.ShutdownRegistry;
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.SlidingStats;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.base.AsyncUtil;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
-import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A storage implementation that ensures committed transactions are written to a log.
- *
- * <p>In the classic write-ahead log usage we'd perform mutations as follows:
- * <ol>
- *   <li>write op to log</li>
- *   <li>perform op locally</li>
- *   <li>*checkpoint</li>
- * </ol>
- *
- * <p>Writing the operation to the log provides us with a fast persistence mechanism to ensure we
- * have a record of our mutation in case we should need to recover state later after a crash or on
- * a new host (assuming the log is distributed).  We then apply the mutation to a local (in-memory)
- * data structure for serving fast read requests and then optionally write down the position of the
- * log entry we wrote in the first step to stable storage to allow for quicker recovery after a
- * crash. Instead of reading the whole log, we can read all entries past the checkpoint.  This
- * design implies that all mutations must be idempotent and free from constraint and thus
- * replayable over newer operations when recovering from an old checkpoint.
- *
- * <p>The important detail in our case is the possibility of writing an op to the log, and then
- * failing to commit locally since we use a local database instead of an in-memory data structure.
- * If we die after such a failure, then another instance can read and apply the logged op
- * erroneously.
- *
- * <p>This implementation leverages a local transaction to handle this:
- * <ol>
- *   <li>start local transaction</li>
- *   <li>perform op locally (uncommitted!)</li>
- *   <li>write op to log</li>
- *   <li>commit local transaction</li>
- *   <li>*checkpoint</li>
- * </ol>
- *
- * <p>If the op fails to apply to local storage we will never write the op to the log and if the op
- * fails to apply to the log, it'll throw and abort the local storage transaction as well.
- */
-public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore {
-
-  /**
-   * A service that can schedule an action to be executed periodically.
-   */
-  @VisibleForTesting
-  interface SchedulingService {
-
-    /**
-     * Schedules an action to execute periodically.
-     *
-     * @param interval The time period to wait until running the {@code action} again.
-     * @param action The action to execute periodically.
-     */
-    void doEvery(Amount<Long, Time> interval, Runnable action);
-  }
-
-  /**
-   * A maintainer for context about open transactions. Assumes that an external entity is
-   * responsible for opening and closing transactions.
-   */
-  interface TransactionManager {
-
-    /**
-     * Checks whether there is an open transaction.
-     *
-     * @return {@code true} if there is an open transaction, {@code false} otherwise.
-     */
-    boolean hasActiveTransaction();
-
-    /**
-     * Adds an operation to the existing transaction.
-     *
-     * @param op Operation to include in the existing transaction.
-     */
-    void log(Op op);
-  }
-
-  private static class ScheduledExecutorSchedulingService implements SchedulingService {
-    private final ScheduledExecutorService scheduledExecutor;
-
-    ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
-        Amount<Long, Time> shutdownGracePeriod) {
-      scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
-      shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination(
-          scheduledExecutor,
-          shutdownGracePeriod.getValue(),
-          shutdownGracePeriod.getUnit().getTimeUnit()));
-    }
-
-    @Override
-    public void doEvery(Amount<Long, Time> interval, Runnable action) {
-      requireNonNull(interval);
-      requireNonNull(action);
-
-      long delay = interval.getValue();
-      TimeUnit timeUnit = interval.getUnit().getTimeUnit();
-      scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
-    }
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(LogStorage.class);
-
-  private final LogManager logManager;
-  private final SchedulingService schedulingService;
-  private final SnapshotStore<Snapshot> snapshotStore;
-  private final Amount<Long, Time> snapshotInterval;
-  private final Storage writeBehindStorage;
-  private final SchedulerStore.Mutable writeBehindSchedulerStore;
-  private final CronJobStore.Mutable writeBehindJobStore;
-  private final TaskStore.Mutable writeBehindTaskStore;
-  private final QuotaStore.Mutable writeBehindQuotaStore;
-  private final AttributeStore.Mutable writeBehindAttributeStore;
-  private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
-  private final ReentrantLock writeLock;
-  private final ThriftBackfill thriftBackfill;
-
-  private StreamManager streamManager;
-  private final WriteAheadStorage writeAheadStorage;
-
-  // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
-  // recovering are controlled at this layer (they're all calls to Mutable store implementations).
-  // The more involved change is changing SnapshotStore to accept a Mutable store provider to
-  // avoid a call to Storage.write() when we replay a Snapshot.
-  private boolean recovered = false;
-  private StreamTransaction transaction = null;
-
-  private final SlidingStats writerWaitStats =
-      new SlidingStats("log_storage_write_lock_wait", "ns");
-
-  private final Map<LogEntry._Fields, Consumer<LogEntry>> logEntryReplayActions;
-  private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
-
-  @Inject
-  LogStorage(
-      LogManager logManager,
-      ShutdownRegistry shutdownRegistry,
-      Settings settings,
-      SnapshotStore<Snapshot> snapshotStore,
-      @Volatile Storage storage,
-      @Volatile SchedulerStore.Mutable schedulerStore,
-      @Volatile CronJobStore.Mutable jobStore,
-      @Volatile TaskStore.Mutable taskStore,
-      @Volatile QuotaStore.Mutable quotaStore,
-      @Volatile AttributeStore.Mutable attributeStore,
-      @Volatile JobUpdateStore.Mutable jobUpdateStore,
-      EventSink eventSink,
-      ReentrantLock writeLock,
-      ThriftBackfill thriftBackfill) {
-
-    this(logManager,
-        new ScheduledExecutorSchedulingService(shutdownRegistry, settings.getShutdownGracePeriod()),
-        snapshotStore,
-        settings.getSnapshotInterval(),
-        storage,
-        schedulerStore,
-        jobStore,
-        taskStore,
-        quotaStore,
-        attributeStore,
-        jobUpdateStore,
-        eventSink,
-        writeLock,
-        thriftBackfill);
-  }
-
-  @VisibleForTesting
-  LogStorage(
-      LogManager logManager,
-      SchedulingService schedulingService,
-      SnapshotStore<Snapshot> snapshotStore,
-      Amount<Long, Time> snapshotInterval,
-      Storage delegateStorage,
-      SchedulerStore.Mutable schedulerStore,
-      CronJobStore.Mutable jobStore,
-      TaskStore.Mutable taskStore,
-      QuotaStore.Mutable quotaStore,
-      AttributeStore.Mutable attributeStore,
-      JobUpdateStore.Mutable jobUpdateStore,
-      EventSink eventSink,
-      ReentrantLock writeLock,
-      ThriftBackfill thriftBackfill) {
-
-    this.logManager = requireNonNull(logManager);
-    this.schedulingService = requireNonNull(schedulingService);
-    this.snapshotStore = requireNonNull(snapshotStore);
-    this.snapshotInterval = requireNonNull(snapshotInterval);
-
-    // Log storage has two distinct operating modes: pre- and post-recovery.  When recovering,
-    // we write directly to the writeBehind stores since we are replaying what's already persisted.
-    // After that, all writes must succeed in the distributed log before they may be considered
-    // successful.
-    this.writeBehindStorage = requireNonNull(delegateStorage);
-    this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
-    this.writeBehindJobStore = requireNonNull(jobStore);
-    this.writeBehindTaskStore = requireNonNull(taskStore);
-    this.writeBehindQuotaStore = requireNonNull(quotaStore);
-    this.writeBehindAttributeStore = requireNonNull(attributeStore);
-    this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
-    this.writeLock = requireNonNull(writeLock);
-    this.thriftBackfill = requireNonNull(thriftBackfill);
-    TransactionManager transactionManager = new TransactionManager() {
-      @Override
-      public boolean hasActiveTransaction() {
-        return transaction != null;
-      }
-
-      @Override
-      public void log(Op op) {
-        transaction.add(op);
-      }
-    };
-    this.writeAheadStorage = new WriteAheadStorage(
-        transactionManager,
-        schedulerStore,
-        jobStore,
-        taskStore,
-        quotaStore,
-        attributeStore,
-        jobUpdateStore,
-        LoggerFactory.getLogger(WriteAheadStorage.class),
-        eventSink);
-
-    this.logEntryReplayActions = buildLogEntryReplayActions();
-    this.transactionReplayActions = buildTransactionReplayActions();
-  }
-
-  @VisibleForTesting
-  final Map<LogEntry._Fields, Consumer<LogEntry>> buildLogEntryReplayActions() {
-    return ImmutableMap.<LogEntry._Fields, Consumer<LogEntry>>builder()
-        .put(LogEntry._Fields.SNAPSHOT, logEntry -> {
-          Snapshot snapshot = logEntry.getSnapshot();
-          LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
-          snapshotStore.applySnapshot(snapshot);
-        })
-        .put(LogEntry._Fields.TRANSACTION, logEntry -> write((NoResult.Quiet) unused -> {
-          for (Op op : logEntry.getTransaction().getOps()) {
-            replayOp(op);
-          }
-        }))
-        .put(LogEntry._Fields.NOOP, item -> {
-          // Nothing to do here
-        })
-        .build();
-  }
-
-  @VisibleForTesting
-  final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
-    return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
-        .put(
-            Op._Fields.SAVE_FRAMEWORK_ID,
-            op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
-        .put(Op._Fields.SAVE_CRON_JOB, op -> {
-          SaveCronJob cronJob = op.getSaveCronJob();
-          writeBehindJobStore.saveAcceptedJob(
-              thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig()));
-        })
-        .put(
-            Op._Fields.REMOVE_JOB,
-            op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
-        .put(
-            Op._Fields.SAVE_TASKS,
-            op -> writeBehindTaskStore.saveTasks(
-                thriftBackfill.backfillTasks(op.getSaveTasks().getTasks())))
-        .put(
-            Op._Fields.REMOVE_TASKS,
-            op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
-        .put(Op._Fields.SAVE_QUOTA, op -> {
-          SaveQuota saveQuota = op.getSaveQuota();
-          writeBehindQuotaStore.saveQuota(
-              saveQuota.getRole(),
-              ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
-        })
-        .put(
-            Op._Fields.REMOVE_QUOTA,
-            op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
-        .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
-          HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
-          // Prior to commit 5cf760b, the store would persist maintenance mode changes for
-          // unknown hosts.  5cf760b began rejecting these, but the replicated log may still
-          // contain entries with a null slave ID.
-          if (attributes.isSetSlaveId()) {
-            writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
-          } else {
-            LOG.info("Dropping host attributes with no agent ID: " + attributes);
-          }
-        })
-        .put(Op._Fields.SAVE_JOB_UPDATE, op ->
-          writeBehindJobUpdateStore.saveJobUpdate(
-              thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
-        .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
-          SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
-          writeBehindJobUpdateStore.saveJobUpdateEvent(
-              IJobUpdateKey.build(event.getKey()),
-              IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
-        })
-        .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
-          SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
-          writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
-              IJobUpdateKey.build(event.getKey()),
-              IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
-        })
-        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
-          LOG.info("Dropping prune operation.  Updates will be pruned later.");
-        })
-        .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
-          writeBehindJobUpdateStore.removeJobUpdates(
-              IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
-        .build();
-  }
-
-  @Override
-  @Timed("scheduler_storage_prepare")
-  public synchronized void prepare() {
-    writeBehindStorage.prepare();
-    // Open the log to make a log replica available to the scheduler group.
-    try {
-      streamManager = logManager.open();
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to open the log, cannot continue", e);
-    }
-  }
-
-  @Override
-  @Timed("scheduler_storage_start")
-  public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
-    write((NoResult.Quiet) unused -> {
-      // Must have the underlying storage started so we can query it for the last checkpoint.
-      // We replay these entries in the forwarded storage system's transactions but not ours - we
-      // do not want to re-record these ops to the log.
-      recover();
-      recovered = true;
-
-      // Now that we're recovered we should let any mutations done in initializationLogic append
-      // to the log, so run it in one of our transactions.
-      write(initializationLogic);
-    });
-
-    scheduleSnapshots();
-  }
-
-  @Override
-  public void stop() {
-    // No-op.
-  }
-
-  @Timed("scheduler_log_recover")
-  void recover() throws RecoveryFailedException {
-    try {
-      streamManager.readFromBeginning(LogStorage.this::replay);
-    } catch (CodingException | InvalidPositionException | StreamAccessException e) {
-      throw new RecoveryFailedException(e);
-    }
-  }
-
-  private static final class RecoveryFailedException extends SchedulerException {
-    RecoveryFailedException(Throwable cause) {
-      super(cause);
-    }
-  }
-
-  private void replay(final LogEntry logEntry) {
-    LogEntry._Fields entryField = logEntry.getSetField();
-    if (!logEntryReplayActions.containsKey(entryField)) {
-      throw new IllegalStateException("Unknown log entry type: " + entryField);
-    }
-
-    logEntryReplayActions.get(entryField).accept(logEntry);
-  }
-
-  private void replayOp(Op op) {
-    Op._Fields opField = op.getSetField();
-    if (!transactionReplayActions.containsKey(opField)) {
-      throw new IllegalStateException("Unknown transaction op: " + opField);
-    }
-
-    transactionReplayActions.get(opField).accept(op);
-  }
-
-  private void scheduleSnapshots() {
-    if (snapshotInterval.getValue() > 0) {
-      schedulingService.doEvery(snapshotInterval, () -> {
-        try {
-          snapshot();
-        } catch (StorageException e) {
-          if (e.getCause() == null) {
-            LOG.warn("StorageException when attempting to snapshot.", e);
-          } else {
-            LOG.warn(e.getMessage(), e.getCause());
-          }
-        }
-      });
-    }
-  }
-
-  /**
-   * Forces a snapshot of the storage state.
-   *
-   * @throws CodingException If there is a problem encoding the snapshot.
-   * @throws InvalidPositionException If the log stream cursor is invalid.
-   * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
-   */
-  @Timed("scheduler_log_snapshot")
-  void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
-    write((NoResult<CodingException>) (MutableStoreProvider unused) -> {
-      LOG.info("Creating snapshot.");
-      Snapshot snapshot = snapshotStore.createSnapshot();
-      persist(snapshot);
-      LOG.info("Snapshot complete."
-          + " host attrs: " + snapshot.getHostAttributesSize()
-          + ", cron jobs: " + snapshot.getCronJobsSize()
-          + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
-          + ", tasks: " + snapshot.getTasksSize()
-          + ", updates: " + snapshot.getJobUpdateDetailsSize());
-    });
-  }
-
-  @Timed("scheduler_log_snapshot_persist")
-  @Override
-  public void persist(Snapshot snapshot)
-      throws CodingException, InvalidPositionException, StreamAccessException {
-
-    streamManager.snapshot(snapshot);
-  }
-
-  private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
-      throws StorageException, E {
-
-    // The log stream transaction has already been set up so we just need to delegate with our
-    // store provider so any mutations performed by work get logged.
-    if (transaction != null) {
-      return work.apply(writeAheadStorage);
-    }
-
-    transaction = streamManager.startTransaction();
-    try {
-      return writeBehindStorage.write(unused -> {
-        T result = work.apply(writeAheadStorage);
-        try {
-          transaction.commit();
-        } catch (CodingException e) {
-          throw new IllegalStateException(
-              "Problem encoding transaction operations to the log stream", e);
-        } catch (StreamAccessException e) {
-          throw new StorageException(
-              "There was a problem committing the transaction to the log.", e);
-        }
-        return result;
-      });
-    } finally {
-      transaction = null;
-    }
-  }
-
-  @Override
-  public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
-    long waitStart = System.nanoTime();
-    writeLock.lock();
-    try {
-      writerWaitStats.accumulate(System.nanoTime() - waitStart);
-      // We don't want to use the log when recovering from it, we just want to update the underlying
-      // store - so pass mutations straight through to the underlying storage.
-      if (!recovered) {
-        return writeBehindStorage.write(work);
-      }
-
-      return doInTransaction(work);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
-    return writeBehindStorage.read(work);
-  }
-
-  @Override
-  public void snapshot() throws StorageException {
-    try {
-      doSnapshot();
-    } catch (CodingException e) {
-      throw new StorageException("Failed to encode a snapshot", e);
-    } catch (InvalidPositionException e) {
-      throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
-    } catch (StreamAccessException e) {
-      throw new StorageException("Failed to create a snapshot", e);
-    }
-  }
-
-  /**
-   * Configuration settings for log storage.
-   */
-  public static class Settings {
-    private final Amount<Long, Time> shutdownGracePeriod;
-    private final Amount<Long, Time> snapshotInterval;
-
-    public Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
-      this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
-      this.snapshotInterval = requireNonNull(snapshotInterval);
-    }
-
-    public Amount<Long, Time> getShutdownGracePeriod() {
-      return shutdownGracePeriod;
-    }
-
-    public Amount<Long, Time> getSnapshotInterval() {
-      return snapshotInterval;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index c8dc7ad..75ec42a 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -32,8 +32,10 @@ import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
 import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.LogStorage.Settings;
+import org.apache.aurora.scheduler.storage.log.LogPersistence.Settings;
 
 import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
 import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
@@ -77,10 +79,13 @@ public class LogStorageModule extends PrivateModule {
     bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
         .toInstance(options.maxLogEntrySize);
     bind(LogManager.class).in(Singleton.class);
-    bind(LogStorage.class).in(Singleton.class);
+    bind(DurableStorage.class).in(Singleton.class);
 
-    install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
-    bind(DistributedSnapshotStore.class).to(LogStorage.class);
+    install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
+    bind(LogPersistence.class).in(Singleton.class);
+    bind(Persistence.class).to(LogPersistence.class);
+    bind(DistributedSnapshotStore.class).to(LogPersistence.class);
+    expose(Persistence.class);
     expose(Storage.class);
     expose(NonVolatileStorage.class);
     expose(DistributedSnapshotStore.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 5859f80..739fad7 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -48,6 +48,7 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
index ea147c0..18da32d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
@@ -13,7 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage.log;
 
-import java.util.function.Consumer;
+import java.util.Iterator;
 
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Snapshot;
@@ -25,23 +25,21 @@ import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
 
 /**
  * Manages interaction with the log stream.  Log entries can be
- * {@link #readFromBeginning(Consumer) read from} the beginning,
+ * {@link #readFromBeginning() read from} the beginning,
  * a {@link #startTransaction() transaction} consisting of one or more local storage
  * operations can be committed atomically, or the log can be compacted by
  * {@link #snapshot(org.apache.aurora.gen.storage.Snapshot) snapshotting}.
  */
 public interface StreamManager {
   /**
-   * Reads all entries in the log stream after the given position.  If the position
-   * supplied is {@code null} then all log entries in the stream will be read.
+   * Reads all entries in the log stream.
    *
-   * @param reader A reader that will be handed log entries decoded from the stream.
+   * @return All stored log entries.
    * @throws CodingException if there was a problem decoding a log entry from the stream.
    * @throws InvalidPositionException if the given position is not found in the log.
    * @throws StreamAccessException if there is a problem reading from the log.
    */
-  void readFromBeginning(Consumer<LogEntry> reader)
-      throws CodingException, InvalidPositionException, StreamAccessException;
+  Iterator<LogEntry> readFromBeginning() throws CodingException, StreamAccessException;
 
   /**
    * Truncates all entries in the log stream occuring before the given position.  The entry at the
@@ -54,8 +52,7 @@ public interface StreamManager {
   void truncateBefore(Log.Position position);
 
   /**
-   * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream
-   * atomically.
+   * Starts a transaction that can be used to commit a series of ops to the log stream atomically.
    *
    * @return StreamTransaction A transaction manager to handle batching up commits to the
    *    underlying stream.

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index baf2647..c5b107f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -19,12 +19,12 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
 
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -95,31 +95,37 @@ class StreamManagerImpl implements StreamManager {
   }
 
   @Override
-  public void readFromBeginning(Consumer<LogEntry> reader)
+  public Iterator<LogEntry> readFromBeginning()
       throws CodingException, InvalidPositionException, StreamAccessException {
 
     Iterator<Log.Entry> entries = stream.readAll();
 
-    while (entries.hasNext()) {
-      LogEntry logEntry = decodeLogEntry(entries.next());
-      while (logEntry != null && isFrame(logEntry)) {
-        logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
-      }
-      if (logEntry != null) {
-        if (logEntry.isSet(LogEntry._Fields.DEFLATED_ENTRY)) {
-          logEntry = Entries.inflate(logEntry);
-          vars.deflatedEntriesRead.incrementAndGet();
-        }
-
-        if (logEntry.isSetDeduplicatedSnapshot()) {
-          logEntry = LogEntry.snapshot(
-              snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot()));
+    return new AbstractIterator<LogEntry>() {
+      @Override
+      protected LogEntry computeNext() {
+        while (entries.hasNext()) {
+          LogEntry logEntry = decodeLogEntry(entries.next());
+          while (logEntry != null && isFrame(logEntry)) {
+            logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
+          }
+          if (logEntry != null) {
+            if (logEntry.isSet(LogEntry._Fields.DEFLATED_ENTRY)) {
+              logEntry = Entries.inflate(logEntry);
+              vars.deflatedEntriesRead.incrementAndGet();
+            }
+
+            if (logEntry.isSetDeduplicatedSnapshot()) {
+              logEntry = LogEntry.snapshot(
+                  snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot()));
+            }
+
+            vars.entriesRead.incrementAndGet();
+            return logEntry;
+          }
         }
-
-        reader.accept(logEntry);
-        vars.entriesRead.incrementAndGet();
+        return endOfData();
       }
-    }
+    };
   }
 
   @Nullable

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
deleted file mode 100644
index 92b64bb..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import com.google.inject.Inject;
-
-import org.apache.aurora.GuavaUtils;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateInstructions;
-import org.apache.aurora.gen.Resource;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.TierInfo;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.resources.ResourceType;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IResource;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.lang.String.format;
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
-import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
-import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
-
-/**
- * Helps migrating thrift schema by populating deprecated and/or replacement fields.
- */
-public final class ThriftBackfill {
-
-  private final TierManager tierManager;
-
-  @Inject
-  public ThriftBackfill(TierManager tierManager) {
-    this.tierManager = requireNonNull(tierManager);
-  }
-
-  private static Resource getResource(Set<Resource> resources, ResourceType type) {
-    return resources.stream()
-            .filter(e -> ResourceType.fromResource(IResource.build(e)).equals(type))
-            .findFirst()
-            .orElseThrow(() ->
-                    new IllegalArgumentException("Missing resource definition for " + type));
-  }
-
-  /**
-   * Ensures TaskConfig.resources and correspondent task-level fields are all populated.
-   *
-   * @param config TaskConfig to backfill.
-   * @return Backfilled TaskConfig.
-   */
-  public TaskConfig backfillTask(TaskConfig config) {
-    backfillTier(config);
-    return config;
-  }
-
-  private void backfillTier(TaskConfig config) {
-    ITaskConfig taskConfig = ITaskConfig.build(config);
-    if (config.isSetTier()) {
-      TierInfo tier = tierManager.getTier(taskConfig);
-      config.setProduction(!tier.isPreemptible() && !tier.isRevocable());
-    } else {
-      config.setTier(tierManager.getTiers()
-          .entrySet()
-          .stream()
-          .filter(e -> e.getValue().isPreemptible() == !taskConfig.isProduction()
-              && !e.getValue().isRevocable())
-          .findFirst()
-          .orElseThrow(() -> new IllegalStateException(
-              format("No matching implicit tier for task of job %s", taskConfig.getJob())))
-          .getKey());
-    }
-  }
-
-  /**
-   * Backfills JobConfiguration. See {@link #backfillTask(TaskConfig)}.
-   *
-   * @param jobConfig JobConfiguration to backfill.
-   * @return Backfilled JobConfiguration.
-   */
-  public IJobConfiguration backfillJobConfiguration(JobConfiguration jobConfig) {
-    backfillTask(jobConfig.getTaskConfig());
-    return IJobConfiguration.build(jobConfig);
-  }
-
-  /**
-   * Backfills set of tasks. See {@link #backfillTask(TaskConfig)}.
-   *
-   * @param tasks Set of tasks to backfill.
-   * @return Backfilled set of tasks.
-   */
-  public Set<IScheduledTask> backfillTasks(Set<ScheduledTask> tasks) {
-    return tasks.stream()
-        .map(t -> backfillScheduledTask(t))
-        .map(IScheduledTask::build)
-        .collect(GuavaUtils.toImmutableSet());
-  }
-
-  /**
-   * Ensures ResourceAggregate.resources and correspondent deprecated fields are all populated.
-   *
-   * @param aggregate ResourceAggregate to backfill.
-   * @return Backfilled IResourceAggregate.
-   */
-  public static IResourceAggregate backfillResourceAggregate(ResourceAggregate aggregate) {
-    if (!aggregate.isSetResources() || aggregate.getResources().isEmpty()) {
-      aggregate.addToResources(Resource.numCpus(aggregate.getNumCpus()));
-      aggregate.addToResources(Resource.ramMb(aggregate.getRamMb()));
-      aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb()));
-    } else {
-      EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES;
-      if (aggregate.getResources().size() > quotaResources.size()) {
-        throw new IllegalArgumentException("Too many resource values in quota.");
-      }
-
-      if (!quotaResources.equals(aggregate.getResources().stream()
-              .map(e -> ResourceType.fromResource(IResource.build(e)))
-              .collect(Collectors.toSet()))) {
-
-        throw new IllegalArgumentException("Quota resources must be exactly: " + quotaResources);
-      }
-      aggregate.setNumCpus(
-              getResource(aggregate.getResources(), CPUS).getNumCpus());
-      aggregate.setRamMb(
-              getResource(aggregate.getResources(), RAM_MB).getRamMb());
-      aggregate.setDiskMb(
-              getResource(aggregate.getResources(), DISK_MB).getDiskMb());
-    }
-    return IResourceAggregate.build(aggregate);
-  }
-
-  private ScheduledTask backfillScheduledTask(ScheduledTask task) {
-    backfillTask(task.getAssignedTask().getTask());
-    return task;
-  }
-
-  /**
-   * Backfills JobUpdate. See {@link #backfillTask(TaskConfig)}.
-   *
-   * @param update JobUpdate to backfill.
-   * @return Backfilled job update.
-   */
-  IJobUpdate backFillJobUpdate(JobUpdate update) {
-    JobUpdateInstructions instructions = update.getInstructions();
-    if (instructions.isSetDesiredState()) {
-      backfillTask(instructions.getDesiredState().getTask());
-    }
-
-    instructions.getInitialState().forEach(e -> backfillTask(e.getTask()));
-
-    return IJobUpdate.build(update);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
deleted file mode 100644
index 41061f8..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.slf4j.Logger;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.storage.log.LogStorage.TransactionManager;
-
-/**
- * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
- * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
- * stores.
- */
-class WriteAheadStorage implements
-    MutableStoreProvider,
-    SchedulerStore.Mutable,
-    CronJobStore.Mutable,
-    TaskStore.Mutable,
-    QuotaStore.Mutable,
-    AttributeStore.Mutable,
-    JobUpdateStore.Mutable {
-
-  private final TransactionManager transactionManager;
-  private final SchedulerStore.Mutable schedulerStore;
-  private final CronJobStore.Mutable jobStore;
-  private final TaskStore.Mutable taskStore;
-  private final QuotaStore.Mutable quotaStore;
-  private final AttributeStore.Mutable attributeStore;
-  private final JobUpdateStore.Mutable jobUpdateStore;
-  private final Logger log;
-  private final EventSink eventSink;
-
-  /**
-   * Creates a new write-ahead storage that delegates to the providing default stores.
-   *
-   * @param transactionManager External controller for transaction operations.
-   * @param schedulerStore Delegate.
-   * @param jobStore       Delegate.
-   * @param taskStore      Delegate.
-   * @param quotaStore     Delegate.
-   * @param attributeStore Delegate.
-   * @param jobUpdateStore Delegate.
-   */
-  WriteAheadStorage(
-      TransactionManager transactionManager,
-      SchedulerStore.Mutable schedulerStore,
-      CronJobStore.Mutable jobStore,
-      TaskStore.Mutable taskStore,
-      QuotaStore.Mutable quotaStore,
-      AttributeStore.Mutable attributeStore,
-      JobUpdateStore.Mutable jobUpdateStore,
-      Logger log,
-      EventSink eventSink) {
-
-    this.transactionManager = requireNonNull(transactionManager);
-    this.schedulerStore = requireNonNull(schedulerStore);
-    this.jobStore = requireNonNull(jobStore);
-    this.taskStore = requireNonNull(taskStore);
-    this.quotaStore = requireNonNull(quotaStore);
-    this.attributeStore = requireNonNull(attributeStore);
-    this.jobUpdateStore = requireNonNull(jobUpdateStore);
-    this.log = requireNonNull(log);
-    this.eventSink = requireNonNull(eventSink);
-  }
-
-  private void write(Op op) {
-    Preconditions.checkState(
-        transactionManager.hasActiveTransaction(),
-        "Mutating operations must be within a transaction.");
-    transactionManager.log(op);
-  }
-
-  @Override
-  public void saveFrameworkId(final String frameworkId) {
-    requireNonNull(frameworkId);
-
-    write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
-    schedulerStore.saveFrameworkId(frameworkId);
-  }
-
-  @Override
-  public void deleteTasks(final Set<String> taskIds) {
-    requireNonNull(taskIds);
-
-    write(Op.removeTasks(new RemoveTasks(taskIds)));
-    taskStore.deleteTasks(taskIds);
-  }
-
-  @Override
-  public void saveTasks(final Set<IScheduledTask> newTasks) {
-    requireNonNull(newTasks);
-
-    write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
-    taskStore.saveTasks(newTasks);
-  }
-
-  @Override
-  public Optional<IScheduledTask> mutateTask(
-      String taskId,
-      Function<IScheduledTask, IScheduledTask> mutator) {
-
-    Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
-    log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
-    write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
-    return mutated;
-  }
-
-  @Override
-  public void saveQuota(final String role, final IResourceAggregate quota) {
-    requireNonNull(role);
-    requireNonNull(quota);
-
-    write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
-    quotaStore.saveQuota(role, quota);
-  }
-
-  @Override
-  public boolean saveHostAttributes(final IHostAttributes attrs) {
-    requireNonNull(attrs);
-
-    boolean changed = attributeStore.saveHostAttributes(attrs);
-    if (changed) {
-      write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
-      eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
-    }
-    return changed;
-  }
-
-  @Override
-  public void removeJob(final IJobKey jobKey) {
-    requireNonNull(jobKey);
-
-    write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
-    jobStore.removeJob(jobKey);
-  }
-
-  @Override
-  public void saveAcceptedJob(final IJobConfiguration jobConfig) {
-    requireNonNull(jobConfig);
-
-    write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
-    jobStore.saveAcceptedJob(jobConfig);
-  }
-
-  @Override
-  public void removeQuota(final String role) {
-    requireNonNull(role);
-
-    write(Op.removeQuota(new RemoveQuota(role)));
-    quotaStore.removeQuota(role);
-  }
-
-  @Override
-  public void saveJobUpdate(IJobUpdate update) {
-    requireNonNull(update);
-
-    write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
-    jobUpdateStore.saveJobUpdate(update);
-  }
-
-  @Override
-  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
-    requireNonNull(key);
-    requireNonNull(event);
-
-    write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
-    jobUpdateStore.saveJobUpdateEvent(key, event);
-  }
-
-  @Override
-  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
-    requireNonNull(key);
-    requireNonNull(event);
-
-    write(Op.saveJobInstanceUpdateEvent(
-        new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
-    jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
-  }
-
-  @Override
-  public void removeJobUpdates(Set<IJobUpdateKey> keys) {
-    requireNonNull(keys);
-
-    // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
-    // read it.  JobUpdates are only removed implicitly when a snapshot is taken.
-    jobUpdateStore.removeJobUpdates(keys);
-  }
-
-  @Override
-  public void deleteAllTasks() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteHostAttributes() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteJobs() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteQuotas() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteAllUpdates() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public SchedulerStore.Mutable getSchedulerStore() {
-    return this;
-  }
-
-  @Override
-  public CronJobStore.Mutable getCronJobStore() {
-    return this;
-  }
-
-  @Override
-  public TaskStore.Mutable getUnsafeTaskStore() {
-    return this;
-  }
-
-  @Override
-  public QuotaStore.Mutable getQuotaStore() {
-    return this;
-  }
-
-  @Override
-  public AttributeStore.Mutable getAttributeStore() {
-    return this;
-  }
-
-  @Override
-  public TaskStore getTaskStore() {
-    return this;
-  }
-
-  @Override
-  public JobUpdateStore.Mutable getJobUpdateStore() {
-    return this;
-  }
-
-  @Override
-  public Optional<String> fetchFrameworkId() {
-    return this.schedulerStore.fetchFrameworkId();
-  }
-
-  @Override
-  public Iterable<IJobConfiguration> fetchJobs() {
-    return this.jobStore.fetchJobs();
-  }
-
-  @Override
-  public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
-    return this.jobStore.fetchJob(jobKey);
-  }
-
-  @Override
-  public Optional<IScheduledTask> fetchTask(String taskId) {
-    return this.taskStore.fetchTask(taskId);
-  }
-
-  @Override
-  public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
-    return this.taskStore.fetchTasks(query);
-  }
-
-  @Override
-  public Set<IJobKey> getJobKeys() {
-    return this.taskStore.getJobKeys();
-  }
-
-  @Override
-  public Optional<IResourceAggregate> fetchQuota(String role) {
-    return this.quotaStore.fetchQuota(role);
-  }
-
-  @Override
-  public Map<String, IResourceAggregate> fetchQuotas() {
-    return this.quotaStore.fetchQuotas();
-  }
-
-  @Override
-  public Optional<IHostAttributes> getHostAttributes(String host) {
-    return this.attributeStore.getHostAttributes(host);
-  }
-
-  @Override
-  public Set<IHostAttributes> getHostAttributes() {
-    return this.attributeStore.getHostAttributes();
-  }
-
-  @Override
-  public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
-    return this.jobUpdateStore.fetchJobUpdates(query);
-  }
-
-  @Override
-  public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
-    return this.jobUpdateStore.fetchJobUpdate(key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 2cc567d..a519b07 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -83,11 +83,13 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IHostStatus;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -99,7 +101,6 @@ import org.apache.aurora.scheduler.storage.entities.IMetadata;
 import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 import org.apache.aurora.scheduler.thrift.aop.AnnotatedAuroraAdmin;
 import org.apache.aurora.scheduler.thrift.aop.ThriftWorkload;
 import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift;
@@ -167,6 +168,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
   private final ConfigurationManager configurationManager;
   private final Thresholds thresholds;
   private final NonVolatileStorage storage;
+  private final DistributedSnapshotStore snapshotStore;
   private final StorageBackup backup;
   private final Recovery recovery;
   private final MaintenanceController maintenance;
@@ -195,6 +197,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
       ConfigurationManager configurationManager,
       Thresholds thresholds,
       NonVolatileStorage storage,
+      DistributedSnapshotStore snapshotStore,
       StorageBackup backup,
       Recovery recovery,
       CronJobManager cronJobManager,
@@ -211,6 +214,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
     this.configurationManager = requireNonNull(configurationManager);
     this.thresholds = requireNonNull(thresholds);
     this.storage = requireNonNull(storage);
+    this.snapshotStore = requireNonNull(snapshotStore);
     this.backup = requireNonNull(backup);
     this.recovery = requireNonNull(recovery);
     this.maintenance = requireNonNull(maintenance);
@@ -635,7 +639,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
 
   @Override
   public Response snapshot() {
-    storage.snapshot();
+    snapshotStore.snapshot();
     return ok();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
index 8cf6871..e82b637 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
@@ -36,11 +36,6 @@ public class FakeNonVolatileStorage implements NonVolatileStorage {
   }
 
   @Override
-  public void snapshot() throws StorageException {
-    // No-op.
-  }
-
-  @Override
   public void start(Quiet initializationLogic) throws StorageException {
     // No-op.
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index c639ab6..aeb8685 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -26,6 +26,7 @@ import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.util.Modules;
 
+import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.app.SchedulerMain;
 import org.apache.aurora.scheduler.app.local.simulator.ClusterSimulatorModule;
@@ -82,7 +83,17 @@ public final class LocalSchedulerMain {
       protected void configure() {
         bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class));
         bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
-        bind(DistributedSnapshotStore.class).toInstance(snapshot -> { });
+        bind(DistributedSnapshotStore.class).toInstance(new DistributedSnapshotStore() {
+          @Override
+          public void snapshot() throws Storage.StorageException {
+            // no-op
+          }
+
+          @Override
+          public void snapshotWith(Snapshot snapshot) {
+            // no-op
+          }
+        });
       }
     };
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
index 7138d6b..09560f4 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
@@ -98,7 +98,7 @@ public class RecoveryTest extends EasyMockTest {
     Capture<MutateWork<Object, Exception>> transaction = createCapture();
     expect(primaryStorage.write(capture(transaction))).andReturn(null);
     Capture<Snapshot> snapshot = createCapture();
-    distributedStore.persist(capture(snapshot));
+    distributedStore.snapshotWith(capture(snapshot));
     shutDownNow.execute();
 
     control.replay();
@@ -127,7 +127,7 @@ public class RecoveryTest extends EasyMockTest {
     Capture<MutateWork<Object, Exception>> transaction = createCapture();
     expect(primaryStorage.write(capture(transaction))).andReturn(null);
     Capture<Snapshot> snapshot = createCapture();
-    distributedStore.persist(capture(snapshot));
+    distributedStore.snapshotWith(capture(snapshot));
     shutDownNow.execute();
 
     control.replay();