You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/03 03:59:13 UTC

[4/4] aurora git commit: Extract a storage Persistence layer

Extract a storage Persistence layer

This extracts the `Log`- and `Snapshot`-specific details from `LogStorage`,
leaving `DurableStorage`.  `DurableStorage` is useful as a general-purpose
`Storage` mutation observer, with `Persistence` being the minimal behavior
needed for an underlying durability layer to provide.

Reviewed at https://reviews.apache.org/r/64234/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/cea43db9
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/cea43db9
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/cea43db9

Branch: refs/heads/master
Commit: cea43db9ded1201f69a85a43fb67244c69cf5347
Parents: de8b375
Author: Bill Farner <wf...@apache.org>
Authored: Sat Dec 2 19:59:03 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Sat Dec 2 19:59:03 2017 -0800

----------------------------------------------------------------------
 .../apache/aurora/codec/ThriftBinaryCodec.java  |   2 +-
 .../aurora/scheduler/base/TaskTestUtil.java     |   2 +-
 .../configuration/ConfigurationManager.java     |   2 +-
 .../scheduler/resources/ResourceManager.java    |   2 +-
 .../storage/CallOrderEnforcingStorage.java      |   6 -
 .../storage/DistributedSnapshotStore.java       |  15 +-
 .../aurora/scheduler/storage/Storage.java       |  10 -
 .../scheduler/storage/backup/Recovery.java      |   2 +-
 .../storage/backup/TemporaryStorage.java        |   2 +-
 .../storage/durability/DurableStorage.java      | 350 ++++++++
 .../storage/durability/Persistence.java         |  64 ++
 .../storage/durability/ThriftBackfill.java      | 175 ++++
 .../storage/durability/TransactionRecorder.java | 122 +++
 .../storage/durability/WriteAheadStorage.java   | 368 ++++++++
 .../scheduler/storage/log/LogPersistence.java   | 257 ++++++
 .../scheduler/storage/log/LogStorage.java       | 576 ------------
 .../scheduler/storage/log/LogStorageModule.java |  13 +-
 .../storage/log/SnapshotStoreImpl.java          |   1 +
 .../scheduler/storage/log/StreamManager.java    |  15 +-
 .../storage/log/StreamManagerImpl.java          |  46 +-
 .../scheduler/storage/log/ThriftBackfill.java   | 175 ----
 .../storage/log/WriteAheadStorage.java          | 369 --------
 .../thrift/SchedulerThriftInterface.java        |   8 +-
 .../app/local/FakeNonVolatileStorage.java       |   5 -
 .../scheduler/app/local/LocalSchedulerMain.java |  13 +-
 .../scheduler/storage/backup/RecoveryTest.java  |   4 +-
 .../storage/durability/DurableStorageTest.java  | 781 ++++++++++++++++
 .../storage/durability/ThriftBackfillTest.java  | 222 +++++
 .../durability/TransactionRecorderTest.java     |  78 ++
 .../durability/WriteAheadStorageTest.java       | 166 ++++
 .../scheduler/storage/log/LogManagerTest.java   |  86 +-
 .../scheduler/storage/log/LogStorageTest.java   | 897 -------------------
 .../storage/log/NonVolatileStorageTest.java     |   5 +-
 .../storage/log/SnapshotStoreImplIT.java        |   1 +
 .../storage/log/ThriftBackfillTest.java         | 222 -----
 .../storage/log/WriteAheadStorageTest.java      | 165 ----
 .../thrift/SchedulerThriftInterfaceTest.java    |   8 +-
 .../aurora/scheduler/thrift/ThriftIT.java       |   2 +
 38 files changed, 2687 insertions(+), 2550 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
index 3c12532..cdbe359 100644
--- a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
+++ b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
@@ -217,7 +217,7 @@ public final class ThriftBinaryCodec {
   /**
    * Thrown when serialization or deserialization failed.
    */
-  public static class CodingException extends Exception {
+  public static class CodingException extends RuntimeException {
     public CodingException(String message) {
       super(message);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 5fe7b9b..e1f20f4 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -47,10 +47,10 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 import org.apache.mesos.v1.Protos;
 import org.apache.mesos.v1.Protos.ExecutorID;
 import org.apache.mesos.v1.Protos.ExecutorInfo;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index fa2f39c..f3e98f2 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -40,6 +40,7 @@ import org.apache.aurora.scheduler.base.UserProvidedStrings;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IContainer;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -48,7 +49,6 @@ import org.apache.aurora.scheduler.storage.entities.IResource;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
 import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 
 import static java.util.Objects.requireNonNull;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index f9dee22..d093753 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -26,12 +26,12 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IResource;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 import org.apache.mesos.v1.Protos.Resource;
 
 import static org.apache.aurora.scheduler.resources.ResourceType.BY_MESOS_NAME;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index 1b10ec5..25fd315 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -132,12 +132,6 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
     return wrapped.write(work);
   }
 
-  @Override
-  public void snapshot() throws StorageException {
-    checkState(State.READY);
-    wrapped.snapshot();
-  }
-
   /**
    * Creates a binding module that will wrap a storage class with {@link CallOrderEnforcingStorage},
    * exposing the order-enforced storage as {@link Storage} and {@link NonVolatileStorage}.

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
index 4ddee40..0c6a955 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
@@ -15,18 +15,25 @@ package org.apache.aurora.scheduler.storage;
 
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
 
 /**
  * A distributed snapshot store that supports persisting globally-visible snapshots.
  */
 public interface DistributedSnapshotStore {
+
+  /**
+   * Clean up the underlying storage by optimizing internal data structures. Does not change
+   * externally-visible state but might not run concurrently with write operations.
+   */
+  void snapshot() throws StorageException;
+
   /**
-   * Writes a snapshot to the distributed storage system.
-   * TODO(William Farner): Currently we're hiding some exceptions (which happen to be
-   * RuntimeExceptions).  Clean these up to be checked, and throw another exception type here.
+   * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an
+   * internally-generated one based on the current state.
    *
    * @param snapshot Snapshot to write.
    * @throws CodingException If the snapshot could not be serialized.
    */
-  void persist(Snapshot snapshot) throws CodingException;
+  void snapshotWith(Snapshot snapshot) throws CodingException;
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 7d325b6..c9ea1de 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -196,10 +196,6 @@ public interface Storage {
    * Executes the unit of read-only {@code work}.  The consistency model creates the possibility
    * for a reader to read uncommitted state from a concurrent writer.
    * <p>
-   * TODO(wfarner): Update this documentation once all stores are backed by
-   * {@link org.apache.aurora.scheduler.storage.db.DbStorage}, as the concurrency behavior will then
-   * be dictated by the {@link org.mybatis.guice.transactional.Transactional#isolation()} used.
-   * <p>
    * TODO(wfarner): This method no longer needs to exist now that there is no global locking for
    * reads.  We could instead directly inject the individual stores where they are used, as long
    * as the stores have a layer to replicate what is currently done by
@@ -253,12 +249,6 @@ public interface Storage {
     void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
 
     /**
-     * Clean up the underlying storage by optimizing internal data structures. Does not change
-     * externally-visible state but might not run concurrently with write operations.
-     */
-    void snapshot() throws StorageException;
-
-    /**
      * Prepares the underlying storage system for clean shutdown.
      */
     void stop();

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 6cd5b2b..3a62f02 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -197,7 +197,7 @@ public interface Recovery {
       void commit() {
         primaryStorage.write((NoResult.Quiet) storeProvider -> {
           try {
-            distributedStore.persist(tempStorage.toSnapshot());
+            distributedStore.snapshotWith(tempStorage.toSnapshot());
             shutDownNow.execute();
           } catch (CodingException e) {
             throw new IllegalStateException("Failed to encode snapshot.", e);

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 3000796..18296b0 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -27,9 +27,9 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 
 import static java.util.Objects.requireNonNull;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
new file mode 100644
index 0000000..85b2113
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A storage implementation that ensures storage mutations are written to a persistence layer.
+ *
+ * <p>In the classic write-ahead log usage we'd perform mutations as follows:
+ * <ol>
+ *   <li>record op</li>
+ *   <li>perform op locally</li>
+ *   <li>persist ops</li>
+ * </ol>
+ *
+ * <p>Writing the operation to persistences ensures we have a record of our mutation in case we
+ * should need to recover state later after a crash or on a new host (assuming the scheduler is
+ * distributed).  We then apply the mutation to a local (in-memory) data structure for serving fast
+ * read requests.
+ *
+ * <p>This implementation leverages a local transaction to handle this:
+ * <ol>
+ *   <li>start local transaction</li>
+ *   <li>perform op locally (uncommitted!)</li>
+ *   <li>write op to persistence</li>
+ * </ol>
+ *
+ * <p>If the op fails to apply to local storage we will never persist the op, and if the op
+ * fails to persist, it'll throw and abort the local storage operation as well.
+ */
+public class DurableStorage implements NonVolatileStorage {
+
+  /**
+   * A maintainer for context about open transactions. Assumes that an external entity is
+   * responsible for opening and closing transactions.
+   */
+  interface TransactionManager {
+
+    /**
+     * Checks whether there is an open transaction.
+     *
+     * @return {@code true} if there is an open transaction, {@code false} otherwise.
+     */
+    boolean hasActiveTransaction();
+
+    /**
+     * Adds an operation to the existing transaction.
+     *
+     * @param op Operation to include in the existing transaction.
+     */
+    void log(Op op);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(DurableStorage.class);
+
+  private final Persistence persistence;
+  private final Storage writeBehindStorage;
+  private final SchedulerStore.Mutable writeBehindSchedulerStore;
+  private final CronJobStore.Mutable writeBehindJobStore;
+  private final TaskStore.Mutable writeBehindTaskStore;
+  private final QuotaStore.Mutable writeBehindQuotaStore;
+  private final AttributeStore.Mutable writeBehindAttributeStore;
+  private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
+  private final ReentrantLock writeLock;
+  private final ThriftBackfill thriftBackfill;
+
+  private final WriteAheadStorage writeAheadStorage;
+
+  // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
+  // recovering are controlled at this layer (they're all calls to Mutable store implementations).
+  // The more involved change is changing SnapshotStore to accept a Mutable store provider to
+  // avoid a call to Storage.write() when we replay a Snapshot.
+  private boolean recovered = false;
+  private TransactionRecorder transaction = null;
+
+  private final SlidingStats writerWaitStats = new SlidingStats("storage_write_lock_wait", "ns");
+
+  private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
+
+  @Inject
+  DurableStorage(
+      Persistence persistence,
+      @Volatile Storage delegateStorage,
+      @Volatile SchedulerStore.Mutable schedulerStore,
+      @Volatile CronJobStore.Mutable jobStore,
+      @Volatile TaskStore.Mutable taskStore,
+      @Volatile QuotaStore.Mutable quotaStore,
+      @Volatile AttributeStore.Mutable attributeStore,
+      @Volatile JobUpdateStore.Mutable jobUpdateStore,
+      EventSink eventSink,
+      ReentrantLock writeLock,
+      ThriftBackfill thriftBackfill) {
+
+    this.persistence = requireNonNull(persistence);
+
+    // DurableStorage has two distinct operating modes: pre- and post-recovery.  When recovering,
+    // we write directly to the writeBehind stores since we are replaying what's already persisted.
+    // After that, all writes must succeed in Persistence before they may be considered successful.
+    this.writeBehindStorage = requireNonNull(delegateStorage);
+    this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
+    this.writeBehindJobStore = requireNonNull(jobStore);
+    this.writeBehindTaskStore = requireNonNull(taskStore);
+    this.writeBehindQuotaStore = requireNonNull(quotaStore);
+    this.writeBehindAttributeStore = requireNonNull(attributeStore);
+    this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
+    this.writeLock = requireNonNull(writeLock);
+    this.thriftBackfill = requireNonNull(thriftBackfill);
+    TransactionManager transactionManager = new TransactionManager() {
+      @Override
+      public boolean hasActiveTransaction() {
+        return transaction != null;
+      }
+
+      @Override
+      public void log(Op op) {
+        transaction.add(op);
+      }
+    };
+    this.writeAheadStorage = new WriteAheadStorage(
+        transactionManager,
+        schedulerStore,
+        jobStore,
+        taskStore,
+        quotaStore,
+        attributeStore,
+        jobUpdateStore,
+        LoggerFactory.getLogger(WriteAheadStorage.class),
+        eventSink);
+
+    this.transactionReplayActions = buildTransactionReplayActions();
+  }
+
+  @VisibleForTesting
+  final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
+    return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
+        .put(
+            Op._Fields.SAVE_FRAMEWORK_ID,
+            op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
+        .put(Op._Fields.SAVE_CRON_JOB, op -> {
+          SaveCronJob cronJob = op.getSaveCronJob();
+          writeBehindJobStore.saveAcceptedJob(
+              thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig()));
+        })
+        .put(
+            Op._Fields.REMOVE_JOB,
+            op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
+        .put(
+            Op._Fields.SAVE_TASKS,
+            op -> writeBehindTaskStore.saveTasks(
+                thriftBackfill.backfillTasks(op.getSaveTasks().getTasks())))
+        .put(
+            Op._Fields.REMOVE_TASKS,
+            op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
+        .put(Op._Fields.SAVE_QUOTA, op -> {
+          SaveQuota saveQuota = op.getSaveQuota();
+          writeBehindQuotaStore.saveQuota(
+              saveQuota.getRole(),
+              ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
+        })
+        .put(
+            Op._Fields.REMOVE_QUOTA,
+            op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
+        .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
+          HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
+          // Prior to commit 5cf760b, the store would persist maintenance mode changes for
+          // unknown hosts.  5cf760b began rejecting these, but the storage may still
+          // contain entries with a null slave ID.
+          if (attributes.isSetSlaveId()) {
+            writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
+          } else {
+            LOG.info("Dropping host attributes with no agent ID: " + attributes);
+          }
+        })
+        .put(Op._Fields.SAVE_JOB_UPDATE, op ->
+          writeBehindJobUpdateStore.saveJobUpdate(
+              thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
+        .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
+          SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
+          writeBehindJobUpdateStore.saveJobUpdateEvent(
+              IJobUpdateKey.build(event.getKey()),
+              IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
+        })
+        .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
+          SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
+          writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
+              IJobUpdateKey.build(event.getKey()),
+              IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
+        })
+        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
+          LOG.info("Dropping prune operation.  Updates will be pruned later.");
+        })
+        .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
+          writeBehindJobUpdateStore.removeJobUpdates(
+              IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
+        .build();
+  }
+
+  @Override
+  @Timed("scheduler_storage_prepare")
+  public synchronized void prepare() {
+    writeBehindStorage.prepare();
+    persistence.prepare();
+  }
+
+  @Override
+  @Timed("scheduler_storage_start")
+  public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
+    write((NoResult.Quiet) unused -> {
+      // Must have the underlying storage started so we can query it.
+      // We replay these entries in the forwarded storage system's transactions but not ours - we
+      // do not want to re-record these ops.
+      recover();
+      recovered = true;
+
+      // Now that we're recovered we should persist any mutations done in initializationLogic, so
+      // run it in one of our transactions.
+      write(initializationLogic);
+    });
+  }
+
+  @Override
+  public void stop() {
+    // No-op.
+  }
+
+  @Timed("scheduler_storage_recover")
+  void recover() throws RecoveryFailedException {
+    try {
+      persistence.recover().forEach(DurableStorage.this::replayOp);
+    } catch (PersistenceException e) {
+      throw new RecoveryFailedException(e);
+    }
+  }
+
+  private static final class RecoveryFailedException extends SchedulerException {
+    RecoveryFailedException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  private void replayOp(Op op) {
+    Op._Fields opField = op.getSetField();
+    if (!transactionReplayActions.containsKey(opField)) {
+      throw new IllegalStateException("Unknown transaction op: " + opField);
+    }
+
+    transactionReplayActions.get(opField).accept(op);
+  }
+
+  private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
+      throws StorageException, E {
+
+    // The transaction has already been set up so we just need to delegate with our store provider
+    // so any mutations may be persisted.
+    if (transaction != null) {
+      return work.apply(writeAheadStorage);
+    }
+
+    transaction = new TransactionRecorder();
+    try {
+      return writeBehindStorage.write(unused -> {
+        T result = work.apply(writeAheadStorage);
+        List<Op> ops = transaction.getOps();
+        if (!ops.isEmpty()) {
+          try {
+            persistence.persist(ops.stream());
+          } catch (PersistenceException e) {
+            throw new StorageException("Failed to persist storage changes", e);
+          }
+        }
+        return result;
+      });
+    } finally {
+      transaction = null;
+    }
+  }
+
+  @Override
+  public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
+    long waitStart = System.nanoTime();
+    writeLock.lock();
+    try {
+      writerWaitStats.accumulate(System.nanoTime() - waitStart);
+      // We don't want to persist when recovering, we just want to update the underlying
+      // store - so pass mutations straight through to the underlying storage.
+      if (!recovered) {
+        return writeBehindStorage.write(work);
+      }
+
+      return doInTransaction(work);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
+    return writeBehindStorage.read(work);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
new file mode 100644
index 0000000..9eb862c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.storage.Op;
+
+/**
+ * Persistence layer for storage operations.
+ */
+public interface Persistence {
+
+  /**
+   * Prepares the persistence layer.  The implementation may use this, for example, to advertise as
+   * a replica to cohort schedulers, or begin syncing state for warm standby.
+   */
+  void prepare();
+
+  /**
+   * Recovers previously-persisted records.
+   *
+   * @return All persisted records.
+   * @throws PersistenceException If recovery failed.
+   */
+  Stream<Op> recover() throws PersistenceException;
+
+  /**
+   * Saves new records.  No records may be considered durably saved until this method returns
+   * successfully.
+   *
+   * @param records Records to save.
+   * @throws PersistenceException If the records could not be saved.
+   */
+  void persist(Stream<Op> records) throws PersistenceException;
+
+  /**
+   * Thrown when a persistence operation fails.
+   */
+  class PersistenceException extends Exception {
+    public PersistenceException(String msg) {
+      super(msg);
+    }
+
+    public PersistenceException(Throwable cause) {
+      super(cause);
+    }
+
+    public PersistenceException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
new file mode 100644
index 0000000..4425d02
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.inject.Inject;
+
+import org.apache.aurora.GuavaUtils;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.Resource;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TierInfo;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IResource;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+
+/**
+ * Helps migrating thrift schema by populating deprecated and/or replacement fields.
+ */
+public final class ThriftBackfill {
+
+  private final TierManager tierManager;
+
+  @Inject
+  public ThriftBackfill(TierManager tierManager) {
+    this.tierManager = requireNonNull(tierManager);
+  }
+
+  private static Resource getResource(Set<Resource> resources, ResourceType type) {
+    return resources.stream()
+            .filter(e -> ResourceType.fromResource(IResource.build(e)).equals(type))
+            .findFirst()
+            .orElseThrow(() ->
+                    new IllegalArgumentException("Missing resource definition for " + type));
+  }
+
+  /**
+   * Ensures TaskConfig.resources and correspondent task-level fields are all populated.
+   *
+   * @param config TaskConfig to backfill.
+   * @return Backfilled TaskConfig.
+   */
+  public TaskConfig backfillTask(TaskConfig config) {
+    backfillTier(config);
+    return config;
+  }
+
+  private void backfillTier(TaskConfig config) {
+    ITaskConfig taskConfig = ITaskConfig.build(config);
+    if (config.isSetTier()) {
+      TierInfo tier = tierManager.getTier(taskConfig);
+      config.setProduction(!tier.isPreemptible() && !tier.isRevocable());
+    } else {
+      config.setTier(tierManager.getTiers()
+          .entrySet()
+          .stream()
+          .filter(e -> e.getValue().isPreemptible() == !taskConfig.isProduction()
+              && !e.getValue().isRevocable())
+          .findFirst()
+          .orElseThrow(() -> new IllegalStateException(
+              format("No matching implicit tier for task of job %s", taskConfig.getJob())))
+          .getKey());
+    }
+  }
+
+  /**
+   * Backfills JobConfiguration. See {@link #backfillTask(TaskConfig)}.
+   *
+   * @param jobConfig JobConfiguration to backfill.
+   * @return Backfilled JobConfiguration.
+   */
+  public IJobConfiguration backfillJobConfiguration(JobConfiguration jobConfig) {
+    backfillTask(jobConfig.getTaskConfig());
+    return IJobConfiguration.build(jobConfig);
+  }
+
+  /**
+   * Backfills set of tasks. See {@link #backfillTask(TaskConfig)}.
+   *
+   * @param tasks Set of tasks to backfill.
+   * @return Backfilled set of tasks.
+   */
+  public Set<IScheduledTask> backfillTasks(Set<ScheduledTask> tasks) {
+    return tasks.stream()
+        .map(t -> backfillScheduledTask(t))
+        .map(IScheduledTask::build)
+        .collect(GuavaUtils.toImmutableSet());
+  }
+
+  /**
+   * Ensures ResourceAggregate.resources and correspondent deprecated fields are all populated.
+   *
+   * @param aggregate ResourceAggregate to backfill.
+   * @return Backfilled IResourceAggregate.
+   */
+  public static IResourceAggregate backfillResourceAggregate(ResourceAggregate aggregate) {
+    if (!aggregate.isSetResources() || aggregate.getResources().isEmpty()) {
+      aggregate.addToResources(Resource.numCpus(aggregate.getNumCpus()));
+      aggregate.addToResources(Resource.ramMb(aggregate.getRamMb()));
+      aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb()));
+    } else {
+      EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES;
+      if (aggregate.getResources().size() > quotaResources.size()) {
+        throw new IllegalArgumentException("Too many resource values in quota.");
+      }
+
+      if (!quotaResources.equals(aggregate.getResources().stream()
+              .map(e -> ResourceType.fromResource(IResource.build(e)))
+              .collect(Collectors.toSet()))) {
+
+        throw new IllegalArgumentException("Quota resources must be exactly: " + quotaResources);
+      }
+      aggregate.setNumCpus(
+              getResource(aggregate.getResources(), CPUS).getNumCpus());
+      aggregate.setRamMb(
+              getResource(aggregate.getResources(), RAM_MB).getRamMb());
+      aggregate.setDiskMb(
+              getResource(aggregate.getResources(), DISK_MB).getDiskMb());
+    }
+    return IResourceAggregate.build(aggregate);
+  }
+
+  private ScheduledTask backfillScheduledTask(ScheduledTask task) {
+    backfillTask(task.getAssignedTask().getTask());
+    return task;
+  }
+
+  /**
+   * Backfills JobUpdate. See {@link #backfillTask(TaskConfig)}.
+   *
+   * @param update JobUpdate to backfill.
+   * @return Backfilled job update.
+   */
+  public IJobUpdate backFillJobUpdate(JobUpdate update) {
+    JobUpdateInstructions instructions = update.getInstructions();
+    if (instructions.isSetDesiredState()) {
+      backfillTask(instructions.getDesiredState().getTask());
+    }
+
+    instructions.getInitialState().forEach(e -> backfillTask(e.getTask()));
+
+    return IJobUpdate.build(update);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
new file mode 100644
index 0000000..1c811e3
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+
+/**
+ * Records a sequence of mutations to the storage.
+ */
+class TransactionRecorder {
+  private final List<Op> ops = Lists.newArrayList();
+
+  void add(Op op) {
+    Op prior = Iterables.getLast(ops, null);
+    if (prior == null || !coalesce(prior, op)) {
+      ops.add(op);
+    }
+  }
+
+  List<Op> getOps() {
+    return ops;
+  }
+
+  /**
+   * Tries to coalesce a new op into the prior to compact the binary representation and increase
+   * batching.
+   *
+   * @param prior The previous op.
+   * @param next The next op to be added.
+   * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
+   */
+  private boolean coalesce(Op prior, Op next) {
+    if (!prior.isSet() && !next.isSet()) {
+      return false;
+    }
+
+    Op._Fields priorType = prior.getSetField();
+    if (!priorType.equals(next.getSetField())) {
+      return false;
+    }
+
+    switch (priorType) {
+      case SAVE_FRAMEWORK_ID:
+        prior.setSaveFrameworkId(next.getSaveFrameworkId());
+        return true;
+      case SAVE_TASKS:
+        coalesce(prior.getSaveTasks(), next.getSaveTasks());
+        return true;
+      case REMOVE_TASKS:
+        coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
+        return true;
+      case SAVE_HOST_ATTRIBUTES:
+        return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
+      default:
+        return false;
+    }
+  }
+
+  private void coalesce(SaveTasks prior, SaveTasks next) {
+    if (next.isSetTasks()) {
+      if (prior.isSetTasks()) {
+        // It is an expected invariant that an operation may reference a task (identified by
+        // task ID) no more than one time.  Therefore, to coalesce two SaveTasks operations,
+        // the most recent task definition overrides the prior operation.
+        Map<String, ScheduledTask> coalesced = Maps.newHashMap();
+        for (ScheduledTask task : prior.getTasks()) {
+          coalesced.put(task.getAssignedTask().getTaskId(), task);
+        }
+        for (ScheduledTask task : next.getTasks()) {
+          coalesced.put(task.getAssignedTask().getTaskId(), task);
+        }
+        prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
+      } else {
+        prior.setTasks(next.getTasks());
+      }
+    }
+  }
+
+  private void coalesce(RemoveTasks prior, RemoveTasks next) {
+    if (next.isSetTaskIds()) {
+      if (prior.isSetTaskIds()) {
+        prior.setTaskIds(ImmutableSet.<String>builder()
+            .addAll(prior.getTaskIds())
+            .addAll(next.getTaskIds())
+            .build());
+      } else {
+        prior.setTaskIds(next.getTaskIds());
+      }
+    }
+  }
+
+  private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
+    if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
+      prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
new file mode 100644
index 0000000..667db06
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.slf4j.Logger;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
+ * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
+ * stores.
+ */
+public class WriteAheadStorage implements
+    MutableStoreProvider,
+    SchedulerStore.Mutable,
+    CronJobStore.Mutable,
+    TaskStore.Mutable,
+    QuotaStore.Mutable,
+    AttributeStore.Mutable,
+    JobUpdateStore.Mutable {
+
+  private final TransactionManager transactionManager;
+  private final SchedulerStore.Mutable schedulerStore;
+  private final CronJobStore.Mutable jobStore;
+  private final TaskStore.Mutable taskStore;
+  private final QuotaStore.Mutable quotaStore;
+  private final AttributeStore.Mutable attributeStore;
+  private final JobUpdateStore.Mutable jobUpdateStore;
+  private final Logger log;
+  private final EventSink eventSink;
+
+  /**
+   * Creates a new write-ahead storage that delegates to the providing default stores.
+   *
+   * @param transactionManager External controller for transaction operations.
+   * @param schedulerStore Delegate.
+   * @param jobStore       Delegate.
+   * @param taskStore      Delegate.
+   * @param quotaStore     Delegate.
+   * @param attributeStore Delegate.
+   * @param jobUpdateStore Delegate.
+   */
+  public WriteAheadStorage(
+      TransactionManager transactionManager,
+      SchedulerStore.Mutable schedulerStore,
+      CronJobStore.Mutable jobStore,
+      TaskStore.Mutable taskStore,
+      QuotaStore.Mutable quotaStore,
+      AttributeStore.Mutable attributeStore,
+      JobUpdateStore.Mutable jobUpdateStore,
+      Logger log,
+      EventSink eventSink) {
+
+    this.transactionManager = requireNonNull(transactionManager);
+    this.schedulerStore = requireNonNull(schedulerStore);
+    this.jobStore = requireNonNull(jobStore);
+    this.taskStore = requireNonNull(taskStore);
+    this.quotaStore = requireNonNull(quotaStore);
+    this.attributeStore = requireNonNull(attributeStore);
+    this.jobUpdateStore = requireNonNull(jobUpdateStore);
+    this.log = requireNonNull(log);
+    this.eventSink = requireNonNull(eventSink);
+  }
+
+  private void write(Op op) {
+    Preconditions.checkState(
+        transactionManager.hasActiveTransaction(),
+        "Mutating operations must be within a transaction.");
+    transactionManager.log(op);
+  }
+
+  @Override
+  public void saveFrameworkId(final String frameworkId) {
+    requireNonNull(frameworkId);
+
+    write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+    schedulerStore.saveFrameworkId(frameworkId);
+  }
+
+  @Override
+  public void deleteTasks(final Set<String> taskIds) {
+    requireNonNull(taskIds);
+
+    write(Op.removeTasks(new RemoveTasks(taskIds)));
+    taskStore.deleteTasks(taskIds);
+  }
+
+  @Override
+  public void saveTasks(final Set<IScheduledTask> newTasks) {
+    requireNonNull(newTasks);
+
+    write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+    taskStore.saveTasks(newTasks);
+  }
+
+  @Override
+  public Optional<IScheduledTask> mutateTask(
+      String taskId,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
+    log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
+    write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+    return mutated;
+  }
+
+  @Override
+  public void saveQuota(final String role, final IResourceAggregate quota) {
+    requireNonNull(role);
+    requireNonNull(quota);
+
+    write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+    quotaStore.saveQuota(role, quota);
+  }
+
+  @Override
+  public boolean saveHostAttributes(final IHostAttributes attrs) {
+    requireNonNull(attrs);
+
+    boolean changed = attributeStore.saveHostAttributes(attrs);
+    if (changed) {
+      write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
+      eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
+    }
+    return changed;
+  }
+
+  @Override
+  public void removeJob(final IJobKey jobKey) {
+    requireNonNull(jobKey);
+
+    write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+    jobStore.removeJob(jobKey);
+  }
+
+  @Override
+  public void saveAcceptedJob(final IJobConfiguration jobConfig) {
+    requireNonNull(jobConfig);
+
+    write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+    jobStore.saveAcceptedJob(jobConfig);
+  }
+
+  @Override
+  public void removeQuota(final String role) {
+    requireNonNull(role);
+
+    write(Op.removeQuota(new RemoveQuota(role)));
+    quotaStore.removeQuota(role);
+  }
+
+  @Override
+  public void saveJobUpdate(IJobUpdate update) {
+    requireNonNull(update);
+
+    write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+    jobUpdateStore.saveJobUpdate(update);
+  }
+
+  @Override
+  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+    requireNonNull(key);
+    requireNonNull(event);
+
+    write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
+    jobUpdateStore.saveJobUpdateEvent(key, event);
+  }
+
+  @Override
+  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
+    requireNonNull(key);
+    requireNonNull(event);
+
+    write(Op.saveJobInstanceUpdateEvent(
+        new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
+    jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
+  }
+
+  @Override
+  public void removeJobUpdates(Set<IJobUpdateKey> keys) {
+    requireNonNull(keys);
+
+    // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
+    // read it.  JobUpdates are only removed implicitly when a snapshot is taken.
+    jobUpdateStore.removeJobUpdates(keys);
+  }
+
+  @Override
+  public void deleteAllTasks() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteHostAttributes() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteJobs() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteQuotas() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteAllUpdates() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public SchedulerStore.Mutable getSchedulerStore() {
+    return this;
+  }
+
+  @Override
+  public CronJobStore.Mutable getCronJobStore() {
+    return this;
+  }
+
+  @Override
+  public TaskStore.Mutable getUnsafeTaskStore() {
+    return this;
+  }
+
+  @Override
+  public QuotaStore.Mutable getQuotaStore() {
+    return this;
+  }
+
+  @Override
+  public AttributeStore.Mutable getAttributeStore() {
+    return this;
+  }
+
+  @Override
+  public TaskStore getTaskStore() {
+    return this;
+  }
+
+  @Override
+  public JobUpdateStore.Mutable getJobUpdateStore() {
+    return this;
+  }
+
+  @Override
+  public Optional<String> fetchFrameworkId() {
+    return this.schedulerStore.fetchFrameworkId();
+  }
+
+  @Override
+  public Iterable<IJobConfiguration> fetchJobs() {
+    return this.jobStore.fetchJobs();
+  }
+
+  @Override
+  public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
+    return this.jobStore.fetchJob(jobKey);
+  }
+
+  @Override
+  public Optional<IScheduledTask> fetchTask(String taskId) {
+    return this.taskStore.fetchTask(taskId);
+  }
+
+  @Override
+  public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+    return this.taskStore.fetchTasks(query);
+  }
+
+  @Override
+  public Set<IJobKey> getJobKeys() {
+    return this.taskStore.getJobKeys();
+  }
+
+  @Override
+  public Optional<IResourceAggregate> fetchQuota(String role) {
+    return this.quotaStore.fetchQuota(role);
+  }
+
+  @Override
+  public Map<String, IResourceAggregate> fetchQuotas() {
+    return this.quotaStore.fetchQuotas();
+  }
+
+  @Override
+  public Optional<IHostAttributes> getHostAttributes(String host) {
+    return this.attributeStore.getHostAttributes(host);
+  }
+
+  @Override
+  public Set<IHostAttributes> getHostAttributes() {
+    return this.attributeStore.getHostAttributes();
+  }
+
+  @Override
+  public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
+    return this.jobUpdateStore.fetchJobUpdates(query);
+  }
+
+  @Override
+  public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
+    return this.jobUpdateStore.fetchJobUpdate(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
new file mode 100644
index 0000000..a0a6b6c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.common.application.ShutdownRegistry;
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Persistence layer that uses a replicated log.
+ */
+class LogPersistence implements Persistence, DistributedSnapshotStore {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LogPersistence.class);
+
+  private final LogManager logManager;
+  private final SnapshotStore<Snapshot> snapshotStore;
+  private final SchedulingService schedulingService;
+  private final Amount<Long, Time> snapshotInterval;
+  private StreamManager streamManager;
+
+  @Inject
+  LogPersistence(
+      Settings settings,
+      LogManager logManager,
+      SnapshotStore<Snapshot> snapshotStore,
+      ShutdownRegistry shutdownRegistry) {
+
+    this(new ScheduledExecutorSchedulingService(
+            shutdownRegistry,
+            settings.getShutdownGracePeriod()),
+        settings.getSnapshotInterval(),
+        logManager,
+        snapshotStore);
+  }
+
+  @VisibleForTesting
+  LogPersistence(
+      SchedulingService schedulingService,
+      Amount<Long, Time> snapshotInterval,
+      LogManager logManager,
+      SnapshotStore<Snapshot> snapshotStore) {
+
+    this.schedulingService = requireNonNull(schedulingService);
+    this.snapshotInterval = requireNonNull(snapshotInterval);
+    this.logManager = requireNonNull(logManager);
+    this.snapshotStore = requireNonNull(snapshotStore);
+  }
+
+  @Override
+  public void prepare() {
+    // Open the log to make a log replica available to the scheduler group.
+    try {
+      streamManager = logManager.open();
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to open the log, cannot continue", e);
+    }
+  }
+
+  @Override
+  public void persist(Stream<Op> mutations) throws PersistenceException {
+    StreamTransaction transaction = streamManager.startTransaction();
+    mutations.forEach(transaction::add);
+    try {
+      transaction.commit();
+    } catch (CodingException e) {
+      throw new PersistenceException(e);
+    }
+  }
+
+  @Override
+  public Stream<Op> recover() throws PersistenceException {
+    scheduleSnapshots();
+
+    try {
+      Iterator<LogEntry> entries = streamManager.readFromBeginning();
+      Iterable<LogEntry> iterableEntries = () -> entries;
+      Stream<LogEntry> entryStream = StreamSupport.stream(iterableEntries.spliterator(), false);
+
+      return entryStream
+          .filter(entry -> entry.getSetField() != LogEntry._Fields.NOOP)
+          .filter(entry -> {
+            if (entry.getSetField() == LogEntry._Fields.SNAPSHOT) {
+              Snapshot snapshot = entry.getSnapshot();
+              LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
+              snapshotStore.applySnapshot(snapshot);
+              return false;
+            }
+            return true;
+          })
+          .peek(entry -> {
+            if (entry.getSetField() != LogEntry._Fields.TRANSACTION) {
+              throw new IllegalStateException("Unknown log entry type: " + entry.getSetField());
+            }
+          })
+          .flatMap(entry -> entry.getTransaction().getOps().stream());
+    } catch (CodingException | InvalidPositionException | StreamAccessException e) {
+      throw new PersistenceException(e);
+    }
+  }
+
+  private void scheduleSnapshots() {
+    if (snapshotInterval.getValue() > 0) {
+      schedulingService.doEvery(snapshotInterval, () -> {
+        try {
+          snapshot();
+        } catch (StorageException e) {
+          if (e.getCause() == null) {
+            LOG.warn("StorageException when attempting to snapshot.", e);
+          } else {
+            LOG.warn(e.getMessage(), e.getCause());
+          }
+        }
+      });
+    }
+  }
+
+  @Override
+  public void snapshot() throws StorageException {
+    try {
+      doSnapshot();
+    } catch (CodingException e) {
+      throw new StorageException("Failed to encode a snapshot", e);
+    } catch (InvalidPositionException e) {
+      throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
+    } catch (StreamAccessException e) {
+      throw new StorageException("Failed to create a snapshot", e);
+    }
+  }
+
+  @Timed("scheduler_log_snapshot_persist")
+  @Override
+  public void snapshotWith(Snapshot snapshot)
+      throws CodingException, InvalidPositionException, StreamAccessException {
+
+    streamManager.snapshot(snapshot);
+  }
+
+  /**
+   * Forces a snapshot of the storage state.
+   *
+   * @throws CodingException If there is a problem encoding the snapshot.
+   * @throws InvalidPositionException If the log stream cursor is invalid.
+   * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
+   */
+  @Timed("scheduler_log_snapshot")
+  void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
+    LOG.info("Creating snapshot.");
+    Snapshot snapshot = snapshotStore.createSnapshot();
+    snapshotWith(snapshot);
+    LOG.info("Snapshot complete."
+        + " host attrs: " + snapshot.getHostAttributesSize()
+        + ", cron jobs: " + snapshot.getCronJobsSize()
+        + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+        + ", tasks: " + snapshot.getTasksSize()
+        + ", updates: " + snapshot.getJobUpdateDetailsSize());
+  }
+
+  /**
+   * A service that can schedule an action to be executed periodically.
+   */
+  @VisibleForTesting
+  interface SchedulingService {
+
+    /**
+     * Schedules an action to execute periodically.
+     *
+     * @param interval The time period to wait until running the {@code action} again.
+     * @param action The action to execute periodically.
+     */
+    void doEvery(Amount<Long, Time> interval, Runnable action);
+  }
+
+  private static class ScheduledExecutorSchedulingService implements SchedulingService {
+    private final ScheduledExecutorService scheduledExecutor;
+
+    ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
+                                       Amount<Long, Time> shutdownGracePeriod) {
+      scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
+      shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination(
+          scheduledExecutor,
+          shutdownGracePeriod.getValue(),
+          shutdownGracePeriod.getUnit().getTimeUnit()));
+    }
+
+    @Override
+    public void doEvery(Amount<Long, Time> interval, Runnable action) {
+      requireNonNull(interval);
+      requireNonNull(action);
+
+      long delay = interval.getValue();
+      TimeUnit timeUnit = interval.getUnit().getTimeUnit();
+      scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
+    }
+  }
+
+  /**
+   * Configuration settings for log persistence.
+   */
+  public static class Settings {
+    private final Amount<Long, Time> shutdownGracePeriod;
+    private final Amount<Long, Time> snapshotInterval;
+
+    Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
+      this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
+      this.snapshotInterval = requireNonNull(snapshotInterval);
+    }
+
+    public Amount<Long, Time> getShutdownGracePeriod() {
+      return shutdownGracePeriod;
+    }
+
+    public Amount<Long, Time> getSnapshotInterval() {
+      return snapshotInterval;
+    }
+  }
+}