You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/03 03:59:10 UTC

[1/4] aurora git commit: Extract a storage Persistence layer

Repository: aurora
Updated Branches:
  refs/heads/master de8b37549 -> cea43db9d


http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
deleted file mode 100644
index 3c056c9..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ /dev/null
@@ -1,897 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Data;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.InstanceTaskConfig;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateAction;
-import org.apache.aurora.gen.JobUpdateEvent;
-import org.apache.aurora.gen.JobUpdateInstructions;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.JobUpdateSettings;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.Range;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.storage.DeduplicatedSnapshot;
-import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveJobUpdates;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.Transaction;
-import org.apache.aurora.gen.storage.storageConstants;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.log.Log;
-import org.apache.aurora.scheduler.log.Log.Entry;
-import org.apache.aurora.scheduler.log.Log.Position;
-import org.apache.aurora.scheduler.log.Log.Stream;
-import org.apache.aurora.scheduler.resources.ResourceTestUtil;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.log.LogStorage.SchedulingService;
-import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
-import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
-import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.Resource.diskMb;
-import static org.apache.aurora.gen.Resource.numCpus;
-import static org.apache.aurora.gen.Resource.ramMb;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeConfig;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.notNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class LogStorageTest extends EasyMockTest {
-
-  private static final Amount<Long, Time> SNAPSHOT_INTERVAL = Amount.of(1L, Time.MINUTES);
-  private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "name");
-  private static final IJobUpdateKey UPDATE_ID =
-      IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "testUpdateId"));
-  private static final long NOW = 42;
-
-  private LogStorage logStorage;
-  private Log log;
-  private SnapshotDeduplicator deduplicator;
-  private Stream stream;
-  private Position position;
-  private StreamMatcher streamMatcher;
-  private SchedulingService schedulingService;
-  private SnapshotStore<Snapshot> snapshotStore;
-  private StorageTestUtil storageUtil;
-  private EventSink eventSink;
-
-  @Before
-  public void setUp() {
-    log = createMock(Log.class);
-    deduplicator = createMock(SnapshotDeduplicator.class);
-
-    StreamManagerFactory streamManagerFactory = logStream -> {
-      HashFunction md5 = Hashing.md5();
-      return new StreamManagerImpl(
-          logStream,
-          new EntrySerializer.EntrySerializerImpl(Amount.of(1, Data.GB), md5),
-          md5,
-          deduplicator);
-    };
-    LogManager logManager = new LogManager(log, streamManagerFactory);
-
-    schedulingService = createMock(SchedulingService.class);
-    snapshotStore = createMock(new Clazz<SnapshotStore<Snapshot>>() { });
-    storageUtil = new StorageTestUtil(this);
-    eventSink = createMock(EventSink.class);
-
-    logStorage = new LogStorage(
-        logManager,
-        schedulingService,
-        snapshotStore,
-        SNAPSHOT_INTERVAL,
-        storageUtil.storage,
-        storageUtil.schedulerStore,
-        storageUtil.jobStore,
-        storageUtil.taskStore,
-        storageUtil.quotaStore,
-        storageUtil.attributeStore,
-        storageUtil.jobUpdateStore,
-        eventSink,
-        new ReentrantLock(),
-        TaskTestUtil.THRIFT_BACKFILL);
-
-    stream = createMock(Stream.class);
-    streamMatcher = LogOpMatcher.matcherFor(stream);
-    position = createMock(Position.class);
-
-    storageUtil.storage.prepare();
-  }
-
-  @Test
-  public void testStart() throws Exception {
-    // We should open the log and arrange for its clean shutdown.
-    expect(log.open()).andReturn(stream);
-
-    // Our start should recover the log and then forward to the underlying storage start of the
-    // supplied initialization logic.
-    AtomicBoolean initialized = new AtomicBoolean(false);
-    MutateWork.NoResult.Quiet initializationLogic = provider -> {
-      // Creating a mock and expecting apply(storeProvider) does not work here for whatever
-      // reason.
-      initialized.set(true);
-    };
-
-    Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
-    storageUtil.storage.write(capture(recoverAndInitializeWork));
-    expectLastCall().andAnswer(() -> {
-      recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
-      return null;
-    });
-
-    Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
-    expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
-        () -> {
-          recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
-          return null;
-        });
-
-    Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture();
-    expect(storageUtil.storage.write(capture(initializationWork))).andAnswer(
-        () -> {
-          initializationWork.getValue().apply(storageUtil.mutableStoreProvider);
-          return null;
-        });
-
-    // We should perform a snapshot when the snapshot thread runs.
-    Capture<Runnable> snapshotAction = createCapture();
-    schedulingService.doEvery(eq(SNAPSHOT_INTERVAL), capture(snapshotAction));
-    Snapshot snapshotContents = new Snapshot()
-        .setTimestamp(NOW)
-        .setTasks(ImmutableSet.of(makeTask("task_id", TaskTestUtil.JOB).newBuilder()));
-    expect(snapshotStore.createSnapshot()).andReturn(snapshotContents);
-    DeduplicatedSnapshot deduplicated =
-        new SnapshotDeduplicatorImpl().deduplicate(snapshotContents);
-    expect(deduplicator.deduplicate(snapshotContents)).andReturn(deduplicated);
-    streamMatcher.expectSnapshot(deduplicated).andReturn(position);
-    stream.truncateBefore(position);
-    Capture<MutateWork<Void, RuntimeException>> snapshotWork = createCapture();
-    expect(storageUtil.storage.write(capture(snapshotWork))).andAnswer(
-        () -> {
-          snapshotWork.getValue().apply(storageUtil.mutableStoreProvider);
-          return null;
-        }).anyTimes();
-
-    // Populate all LogEntry types.
-    buildReplayLogEntries();
-
-    control.replay();
-
-    logStorage.prepare();
-    logStorage.start(initializationLogic);
-    assertTrue(initialized.get());
-
-    // Run the snapshot thread.
-    snapshotAction.getValue().run();
-
-    // Assert all LogEntry types have handlers defined.
-    // Our current StreamManagerImpl.readFromBeginning() does not let some entries escape
-    // the decoding routine making handling them in replay unnecessary.
-    assertEquals(
-        Sets.complementOf(EnumSet.of(
-            LogEntry._Fields.FRAME,
-            LogEntry._Fields.DEDUPLICATED_SNAPSHOT,
-            LogEntry._Fields.DEFLATED_ENTRY)),
-        EnumSet.copyOf(logStorage.buildLogEntryReplayActions().keySet()));
-
-    // Assert all Transaction types have handlers defined.
-    assertEquals(
-        EnumSet.allOf(Op._Fields.class),
-        EnumSet.copyOf(logStorage.buildTransactionReplayActions().keySet()));
-  }
-
-  private void buildReplayLogEntries() throws Exception {
-    ImmutableSet.Builder<LogEntry> builder = ImmutableSet.builder();
-
-    builder.add(createTransaction(Op.saveFrameworkId(new SaveFrameworkId("bob"))));
-    storageUtil.schedulerStore.saveFrameworkId("bob");
-
-    JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig());
-    JobConfiguration expectedJob =
-        new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder());
-    SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob);
-    builder.add(createTransaction(Op.saveCronJob(cronJob)));
-    storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob));
-
-    RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
-    builder.add(createTransaction(Op.removeJob(removeJob)));
-    storageUtil.jobStore.removeJob(JOB_KEY);
-
-    ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder();
-    actualTask.getAssignedTask().setTask(nonBackfilledConfig());
-    IScheduledTask expectedTask = makeTask("id", JOB_KEY);
-    SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask));
-    builder.add(createTransaction(Op.saveTasks(saveTasks)));
-    storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask));
-
-    RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
-    builder.add(createTransaction(Op.removeTasks(removeTasks)));
-    storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
-
-    ResourceAggregate nonBackfilled = new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(32)
-        .setDiskMb(64);
-    SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled);
-    builder.add(createTransaction(Op.saveQuota(saveQuota)));
-    storageUtil.quotaStore.saveQuota(
-        saveQuota.getRole(),
-        IResourceAggregate.build(nonBackfilled.deepCopy()
-            .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))));
-
-    builder.add(createTransaction(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole()))));
-    storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
-
-    // This entry lacks a slave ID, and should therefore be discarded.
-    SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
-        .setHost("host1")
-        .setMode(MaintenanceMode.DRAINED));
-    builder.add(createTransaction(Op.saveHostAttributes(hostAttributes1)));
-
-    SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
-        .setHost("host2")
-        .setSlaveId("slave2")
-        .setMode(MaintenanceMode.DRAINED));
-    builder.add(createTransaction(Op.saveHostAttributes(hostAttributes2)));
-    expect(storageUtil.attributeStore.saveHostAttributes(
-        IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true);
-
-    JobUpdate actualUpdate = new JobUpdate()
-        .setSummary(new JobUpdateSummary().setKey(UPDATE_ID.newBuilder()))
-        .setInstructions(new JobUpdateInstructions()
-            .setInitialState(
-                ImmutableSet.of(new InstanceTaskConfig().setTask(nonBackfilledConfig())))
-            .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig())));
-    JobUpdate expectedUpdate = actualUpdate.deepCopy();
-    expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder());
-    expectedUpdate.getInstructions().getInitialState()
-        .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
-    SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
-    builder.add(createTransaction(Op.saveJobUpdate(saveUpdate)));
-    storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
-
-    SaveJobUpdateEvent saveUpdateEvent =
-        new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder());
-    builder.add(createTransaction(Op.saveJobUpdateEvent(saveUpdateEvent)));
-    storageUtil.jobUpdateStore.saveJobUpdateEvent(
-        UPDATE_ID,
-        IJobUpdateEvent.build(saveUpdateEvent.getEvent()));
-
-    SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent(
-        new JobInstanceUpdateEvent(),
-        UPDATE_ID.newBuilder());
-    builder.add(createTransaction(Op.saveJobInstanceUpdateEvent(saveInstanceEvent)));
-    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
-        UPDATE_ID,
-        IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()));
-
-    builder.add(createTransaction(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L))));
-    // No expectation - this op is ignored.
-
-    builder.add(createTransaction(Op.removeJobUpdate(
-        new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder())))));
-    storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID));
-
-    // NOOP LogEntry
-    builder.add(LogEntry.noop(true));
-
-    // Snapshot LogEntry
-    Snapshot snapshot = new Snapshot();
-    builder.add(LogEntry.snapshot(snapshot));
-    snapshotStore.applySnapshot(snapshot);
-
-    ImmutableSet.Builder<Entry> entryBuilder = ImmutableSet.builder();
-    for (LogEntry logEntry : builder.build()) {
-      Entry entry = createMock(Entry.class);
-      entryBuilder.add(entry);
-      expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry));
-    }
-
-    expect(stream.readAll()).andReturn(entryBuilder.build().iterator());
-  }
-
-  private TaskConfig nonBackfilledConfig() {
-    // When more fields have to be backfilled
-    // modify this method.
-    return makeConfig(JOB_KEY).newBuilder();
-  }
-
-  abstract class AbstractStorageFixture {
-    private final AtomicBoolean runCalled = new AtomicBoolean(false);
-
-    AbstractStorageFixture() {
-      // Prevent otherwise silent noop tests that forget to call run().
-      addTearDown(new TearDown() {
-        @Override
-        public void tearDown() {
-          assertTrue(runCalled.get());
-        }
-      });
-    }
-
-    void run() throws Exception {
-      runCalled.set(true);
-
-      // Expect basic start operations.
-
-      // Open the log stream.
-      expect(log.open()).andReturn(stream);
-
-      // Replay the log and perform and supplied initializationWork.
-      // Simulate NOOP initialization work
-      // Creating a mock and expecting apply(storeProvider) does not work here for whatever
-      // reason.
-      MutateWork.NoResult.Quiet initializationLogic = storeProvider -> {
-        // No-op.
-      };
-
-      Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
-      storageUtil.storage.write(capture(recoverAndInitializeWork));
-      expectLastCall().andAnswer(() -> {
-        recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
-        return null;
-      });
-
-      expect(stream.readAll()).andReturn(Collections.emptyIterator());
-      Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
-      expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
-          () -> {
-            recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
-            return null;
-          });
-
-      // Schedule snapshots.
-      schedulingService.doEvery(eq(SNAPSHOT_INTERVAL), notNull(Runnable.class));
-
-      // Setup custom test expectations.
-      setupExpectations();
-
-      control.replay();
-
-      // Start the system.
-      logStorage.prepare();
-      logStorage.start(initializationLogic);
-
-      // Exercise the system.
-      runTest();
-    }
-
-    protected void setupExpectations() throws Exception {
-      // Default to no expectations.
-    }
-
-    protected abstract void runTest();
-  }
-
-  abstract class AbstractMutationFixture extends AbstractStorageFixture {
-    @Override
-    protected void runTest() {
-      logStorage.write((Quiet) AbstractMutationFixture.this::performMutations);
-    }
-
-    protected abstract void performMutations(MutableStoreProvider storeProvider);
-  }
-
-  @Test
-  public void testSaveFrameworkId() throws Exception {
-    String frameworkId = "bob";
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws CodingException {
-        storageUtil.expectWrite();
-        storageUtil.schedulerStore.saveFrameworkId(frameworkId);
-        streamMatcher.expectTransaction(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getSchedulerStore().saveFrameworkId(frameworkId);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveAcceptedJob() throws Exception {
-    IJobConfiguration jobConfig =
-        IJobConfiguration.build(new JobConfiguration().setKey(JOB_KEY.newBuilder()));
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobStore.saveAcceptedJob(jobConfig);
-        streamMatcher.expectTransaction(
-            Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getCronJobStore().saveAcceptedJob(jobConfig);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveJob() throws Exception {
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobStore.removeJob(JOB_KEY);
-        streamMatcher.expectTransaction(
-            Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getCronJobStore().removeJob(JOB_KEY);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveTasks() throws Exception {
-    Set<IScheduledTask> tasks = ImmutableSet.of(task("a", ScheduleStatus.INIT));
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.saveTasks(tasks);
-        streamMatcher.expectTransaction(
-            Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(tasks);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testMutateTasks() throws Exception {
-    String taskId = "fred";
-    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
-        streamMatcher.expectTransaction(
-            Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
-            .andReturn(null);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testNestedTransactions() throws Exception {
-    String taskId = "fred";
-    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
-    ImmutableSet<String> tasksToRemove = ImmutableSet.of("b");
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
-
-        storageUtil.taskStore.deleteTasks(tasksToRemove);
-
-        streamMatcher.expectTransaction(
-            Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))),
-            Op.removeTasks(new RemoveTasks(tasksToRemove)))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
-
-        logStorage.write((NoResult.Quiet)
-            innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveAndMutateTasks() throws Exception {
-    String taskId = "fred";
-    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT));
-    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.saveTasks(saved);
-
-        // Nested transaction with result.
-        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
-
-        // Resulting stream operation.
-        streamMatcher.expectTransaction(Op.saveTasks(
-            new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
-            .andReturn(null);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(saved);
-        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception {
-    String taskId = "fred";
-    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT));
-    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.saveTasks(saved);
-
-        // Nested transaction with result.
-        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
-
-        // Resulting stream operation.
-        streamMatcher.expectTransaction(
-            Op.saveTasks(new SaveTasks(
-                ImmutableSet.<ScheduledTask>builder()
-                    .addAll(IScheduledTask.toBuildersList(saved))
-                    .add(mutated.get().newBuilder())
-                    .build())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(saved);
-        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveTasksQuery() throws Exception {
-    IScheduledTask task = task("a", ScheduleStatus.FINISHED);
-    Set<String> taskIds = Tasks.ids(task);
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.deleteTasks(taskIds);
-        streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveTasksIds() throws Exception {
-    Set<String> taskIds = ImmutableSet.of("42");
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.deleteTasks(taskIds);
-        streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveQuota() throws Exception {
-    String role = "role";
-    IResourceAggregate quota = ResourceTestUtil.aggregate(1.0, 128L, 1024L);
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.quotaStore.saveQuota(role, quota);
-        streamMatcher.expectTransaction(Op.saveQuota(new SaveQuota(role, quota.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getQuotaStore().saveQuota(role, quota);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveQuota() throws Exception {
-    String role = "role";
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.quotaStore.removeQuota(role);
-        streamMatcher.expectTransaction(Op.removeQuota(new RemoveQuota(role))).andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getQuotaStore().removeQuota(role);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveHostAttributes() throws Exception {
-    String host = "hostname";
-    Set<Attribute> attributes =
-        ImmutableSet.of(new Attribute().setName("attr").setValues(ImmutableSet.of("value")));
-    Optional<IHostAttributes> hostAttributes = Optional.of(
-        IHostAttributes.build(new HostAttributes()
-            .setHost(host)
-            .setAttributes(attributes)));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        expect(storageUtil.attributeStore.getHostAttributes(host))
-            .andReturn(Optional.absent());
-
-        expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
-
-        expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true);
-        eventSink.post(new PubsubEvent.HostAttributesChanged(hostAttributes.get()));
-        streamMatcher.expectTransaction(
-            Op.saveHostAttributes(new SaveHostAttributes(hostAttributes.get().newBuilder())))
-            .andReturn(position);
-
-        expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get()))
-            .andReturn(false);
-
-        expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        AttributeStore.Mutable store = storeProvider.getAttributeStore();
-        assertEquals(Optional.absent(), store.getHostAttributes(host));
-
-        assertTrue(store.saveHostAttributes(hostAttributes.get()));
-
-        assertEquals(hostAttributes, store.getHostAttributes(host));
-
-        assertFalse(store.saveHostAttributes(hostAttributes.get()));
-
-        assertEquals(hostAttributes, store.getHostAttributes(host));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveUpdate() throws Exception {
-    IJobUpdate update = IJobUpdate.build(new JobUpdate()
-        .setSummary(new JobUpdateSummary()
-            .setKey(UPDATE_ID.newBuilder())
-            .setUser("user"))
-        .setInstructions(new JobUpdateInstructions()
-            .setDesiredState(new InstanceTaskConfig()
-                .setTask(new TaskConfig())
-                .setInstances(ImmutableSet.of(new Range(0, 3))))
-            .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
-                .setTask(new TaskConfig())
-                .setInstances(ImmutableSet.of(new Range(0, 3)))))
-            .setSettings(new JobUpdateSettings())));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobUpdateStore.saveJobUpdate(update);
-        streamMatcher.expectTransaction(
-            Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobUpdate(update);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveJobUpdateEvent() throws Exception {
-    IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent()
-        .setStatus(JobUpdateStatus.ROLLING_BACK)
-        .setTimestampMs(12345L));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobUpdateStore.saveJobUpdateEvent(UPDATE_ID, event);
-        streamMatcher.expectTransaction(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(
-            event.newBuilder(),
-            UPDATE_ID.newBuilder()))).andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobUpdateEvent(UPDATE_ID, event);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveJobInstanceUpdateEvent() throws Exception {
-    IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(new JobInstanceUpdateEvent()
-        .setAction(JobUpdateAction.INSTANCE_ROLLING_BACK)
-        .setTimestampMs(12345L)
-        .setInstanceId(0));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(UPDATE_ID, event);
-        streamMatcher.expectTransaction(Op.saveJobInstanceUpdateEvent(
-            new SaveJobInstanceUpdateEvent(
-                event.newBuilder(),
-                UPDATE_ID.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(UPDATE_ID, event);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveJobUpdates() throws Exception {
-    IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey()
-        .setJob(JOB_KEY.newBuilder())
-        .setId("update-id"));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(key));
-
-        // No log transaction is generated since this version is currently in 'read-only'
-        // compatibility mode for this operation type.
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key));
-      }
-    }.run();
-  }
-
-  private LogEntry createTransaction(Op... ops) {
-    return LogEntry.transaction(
-        new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));
-  }
-
-  private static IScheduledTask task(String id, ScheduleStatus status) {
-    return IScheduledTask.build(new ScheduledTask()
-        .setStatus(status)
-        .setAssignedTask(new AssignedTask()
-            .setTaskId(id)
-            .setTask(new TaskConfig())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
index f43a836..eb966d7 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
@@ -42,6 +42,7 @@ import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
@@ -61,6 +62,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
   private FakeLog log;
   private Runnable teardown = () -> { };
   private NonVolatileStorage storage;
+  private DistributedSnapshotStore snapshotStore;
 
   @Before
   public void setUp() {
@@ -95,6 +97,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
         }
     );
     storage = injector.getInstance(NonVolatileStorage.class);
+    snapshotStore = injector.getInstance(DistributedSnapshotStore.class);
     storage.prepare();
     storage.start(w -> { });
 
@@ -147,7 +150,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
           });
 
           // Result should survive another reset.
-          storage.snapshot();
+          snapshotStore.snapshot();
           resetStorage();
           storage.read(stores -> {
             transaction.getSecond().accept(stores);

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
index a1944c4..5634f92 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
@@ -55,6 +55,7 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java
deleted file mode 100644
index 59c2c5b..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.Resource.diskMb;
-import static org.apache.aurora.gen.Resource.namedPort;
-import static org.apache.aurora.gen.Resource.numCpus;
-import static org.apache.aurora.gen.Resource.ramMb;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class ThriftBackfillTest extends EasyMockTest {
-
-  private ThriftBackfill thriftBackfill;
-  private TierManager tierManager;
-
-  @Before
-  public void setUp() {
-    tierManager = createMock(TierManager.class);
-    thriftBackfill = new ThriftBackfill(tierManager);
-  }
-
-  @Test
-  public void testFieldsToSetNoPorts() {
-    TaskConfig config = new TaskConfig()
-        .setResources(ImmutableSet.of(
-            numCpus(1.0),
-            ramMb(32),
-            diskMb(64)))
-        .setProduction(false)
-        .setTier("tierName");
-    TaskConfig expected = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.DEV_TIER);
-
-    control.replay();
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test
-  public void testResourceAggregateFieldsToSet() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(32)
-        .setDiskMb(64);
-
-    IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))));
-
-    assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
-  }
-
-  @Test
-  public void testResourceAggregateSetToFields() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
-        .setNumCpus(1.0)
-        .setRamMb(32)
-        .setDiskMb(64));
-
-    assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testResourceAggregateTooManyResources() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64), numCpus(2.0)));
-    ThriftBackfill.backfillResourceAggregate(aggregate);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testResourceAggregateInvalidResources() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), namedPort("http")));
-    ThriftBackfill.backfillResourceAggregate(aggregate);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testResourceAggregateMissingResources() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32)));
-    ThriftBackfill.backfillResourceAggregate(aggregate);
-  }
-
-  @Test
-  public void testBackfillTierProduction() {
-    TaskConfig config = new TaskConfig()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
-        .setProduction(true)
-        .setTier("tierName");
-    TaskConfig expected = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.PREFERRED_TIER);
-
-    control.replay();
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test
-  public void testBackfillTierNotProduction() {
-    TaskConfig config = new TaskConfig()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
-        .setProduction(true)
-        .setTier("tierName");
-    TaskConfig configWithBackfilledResources = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    expect(tierManager.getTier(ITaskConfig.build(configWithBackfilledResources)))
-        .andReturn(TaskTestUtil.DEV_TIER);
-
-    control.replay();
-
-    TaskConfig expected = configWithBackfilledResources.deepCopy()
-        .setProduction(false);
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test
-  public void testBackfillTierSetsTierToPreemptible() {
-    TaskConfig config = new TaskConfig()
-            .setResources(ImmutableSet.of(
-                    numCpus(1.0),
-                    ramMb(32),
-                    diskMb(64)));
-    TaskConfig configWithBackfilledResources = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
-
-    control.replay();
-
-    TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preemptible");
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test
-  public void testBackfillTierSetsTierToPreferred() {
-    TaskConfig config = new TaskConfig()
-        .setResources(ImmutableSet.of(
-            numCpus(1.0),
-            ramMb(32),
-            diskMb(64)))
-        .setProduction(true);
-    TaskConfig configWithBackfilledResources = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
-
-    control.replay();
-
-    TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preferred");
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testBackfillTierBadTierConfiguration() {
-    TaskConfig config = new TaskConfig()
-            .setResources(ImmutableSet.of(
-                    numCpus(1.0),
-                    ramMb(32),
-                    diskMb(64)));
-
-    expect(tierManager.getTiers()).andReturn(ImmutableMap.of());
-
-    control.replay();
-
-    thriftBackfill.backfillTask(config);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
deleted file mode 100644
index 8a99b36..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class WriteAheadStorageTest extends EasyMockTest {
-
-  private LogStorage.TransactionManager transactionManager;
-  private TaskStore.Mutable taskStore;
-  private AttributeStore.Mutable attributeStore;
-  private JobUpdateStore.Mutable jobUpdateStore;
-  private EventSink eventSink;
-  private WriteAheadStorage storage;
-
-  @Before
-  public void setUp() {
-    transactionManager = createMock(LogStorage.TransactionManager.class);
-    taskStore = createMock(TaskStore.Mutable.class);
-    attributeStore = createMock(AttributeStore.Mutable.class);
-    jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
-    eventSink = createMock(EventSink.class);
-
-    storage = new WriteAheadStorage(
-        transactionManager,
-        createMock(SchedulerStore.Mutable.class),
-        createMock(CronJobStore.Mutable.class),
-        taskStore,
-        createMock(QuotaStore.Mutable.class),
-        attributeStore,
-        jobUpdateStore,
-        LoggerFactory.getLogger(WriteAheadStorageTest.class),
-        eventSink);
-  }
-
-  private void expectOp(Op op) {
-    expect(transactionManager.hasActiveTransaction()).andReturn(true);
-    transactionManager.log(op);
-  }
-
-  @Test
-  public void testRemoveUpdates() {
-    Set<IJobUpdateKey> removed = ImmutableSet.of(
-        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
-        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
-    jobUpdateStore.removeJobUpdates(removed);
-    // No operation is written since this Op is in read-only compatibility mode.
-
-    control.replay();
-
-    storage.removeJobUpdates(removed);
-  }
-
-  @Test
-  public void testMutate() {
-    String taskId = "a";
-    Function<IScheduledTask, IScheduledTask> mutator =
-        createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
-    Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
-
-    expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
-    expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
-    control.replay();
-
-    assertEquals(mutated, storage.mutateTask(taskId, mutator));
-  }
-
-  @Test
-  public void testSaveHostAttributes() {
-    IHostAttributes attributes = IHostAttributes.build(
-        new HostAttributes()
-            .setHost("a")
-            .setMode(MaintenanceMode.DRAINING)
-            .setAttributes(ImmutableSet.of(
-                new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
-
-    expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
-    expectOp(Op.saveHostAttributes(
-        new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
-    eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
-
-    expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
-
-    control.replay();
-
-    assertTrue(storage.saveHostAttributes(attributes));
-
-    assertFalse(storage.saveHostAttributes(attributes));
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteAllTasks() {
-    control.replay();
-    storage.deleteAllTasks();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteHostAttributes() {
-    control.replay();
-    storage.deleteHostAttributes();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteJobs() {
-    control.replay();
-    storage.deleteJobs();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteQuotas() {
-    control.replay();
-    storage.deleteQuotas();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteAllUpdatesAndEvents() {
-    control.replay();
-    storage.deleteAllUpdates();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 42a79a6..8837384 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -84,6 +84,7 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
@@ -176,6 +177,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
       ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2"));
 
   private StorageTestUtil storageUtil;
+  private DistributedSnapshotStore snapshotStore;
   private StorageBackup backup;
   private Recovery recovery;
   private MaintenanceController maintenance;
@@ -194,6 +196,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void setUp() throws Exception {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
+    snapshotStore = createMock(DistributedSnapshotStore.class);
     backup = createMock(StorageBackup.class);
     recovery = createMock(Recovery.class);
     maintenance = createMock(MaintenanceController.class);
@@ -212,6 +215,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
             TaskTestUtil.CONFIGURATION_MANAGER,
             THRESHOLDS,
             storageUtil.storage,
+            snapshotStore,
             backup,
             recovery,
             cronJobManager,
@@ -1105,10 +1109,10 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testSnapshot() throws Exception {
-    storageUtil.storage.snapshot();
+    snapshotStore.snapshot();
     expectLastCall();
 
-    storageUtil.storage.snapshot();
+    snapshotStore.snapshot();
     expectLastCall().andThrow(new StorageException("mock error!"));
 
     control.replay();

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index b2c371c..bb0fd89 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -62,6 +62,7 @@ import org.apache.aurora.scheduler.quota.QuotaModule;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.stats.StatsModule;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
@@ -135,6 +136,7 @@ public class ThriftIT extends EasyMockTest {
             bind(FrameworkInfoFactoryImpl.class).in(Singleton.class);
             bindMock(Recovery.class);
             bindMock(StorageBackup.class);
+            bindMock(DistributedSnapshotStore.class);
             bind(IServerInfo.class).toInstance(SERVER_INFO);
           }
 


[3/4] aurora git commit: Extract a storage Persistence layer

Posted by wf...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
deleted file mode 100644
index 07b4bdb..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.common.application.ShutdownRegistry;
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.SlidingStats;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.base.AsyncUtil;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
-import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A storage implementation that ensures committed transactions are written to a log.
- *
- * <p>In the classic write-ahead log usage we'd perform mutations as follows:
- * <ol>
- *   <li>write op to log</li>
- *   <li>perform op locally</li>
- *   <li>*checkpoint</li>
- * </ol>
- *
- * <p>Writing the operation to the log provides us with a fast persistence mechanism to ensure we
- * have a record of our mutation in case we should need to recover state later after a crash or on
- * a new host (assuming the log is distributed).  We then apply the mutation to a local (in-memory)
- * data structure for serving fast read requests and then optionally write down the position of the
- * log entry we wrote in the first step to stable storage to allow for quicker recovery after a
- * crash. Instead of reading the whole log, we can read all entries past the checkpoint.  This
- * design implies that all mutations must be idempotent and free from constraint and thus
- * replayable over newer operations when recovering from an old checkpoint.
- *
- * <p>The important detail in our case is the possibility of writing an op to the log, and then
- * failing to commit locally since we use a local database instead of an in-memory data structure.
- * If we die after such a failure, then another instance can read and apply the logged op
- * erroneously.
- *
- * <p>This implementation leverages a local transaction to handle this:
- * <ol>
- *   <li>start local transaction</li>
- *   <li>perform op locally (uncommitted!)</li>
- *   <li>write op to log</li>
- *   <li>commit local transaction</li>
- *   <li>*checkpoint</li>
- * </ol>
- *
- * <p>If the op fails to apply to local storage we will never write the op to the log and if the op
- * fails to apply to the log, it'll throw and abort the local storage transaction as well.
- */
-public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore {
-
-  /**
-   * A service that can schedule an action to be executed periodically.
-   */
-  @VisibleForTesting
-  interface SchedulingService {
-
-    /**
-     * Schedules an action to execute periodically.
-     *
-     * @param interval The time period to wait until running the {@code action} again.
-     * @param action The action to execute periodically.
-     */
-    void doEvery(Amount<Long, Time> interval, Runnable action);
-  }
-
-  /**
-   * A maintainer for context about open transactions. Assumes that an external entity is
-   * responsible for opening and closing transactions.
-   */
-  interface TransactionManager {
-
-    /**
-     * Checks whether there is an open transaction.
-     *
-     * @return {@code true} if there is an open transaction, {@code false} otherwise.
-     */
-    boolean hasActiveTransaction();
-
-    /**
-     * Adds an operation to the existing transaction.
-     *
-     * @param op Operation to include in the existing transaction.
-     */
-    void log(Op op);
-  }
-
-  private static class ScheduledExecutorSchedulingService implements SchedulingService {
-    private final ScheduledExecutorService scheduledExecutor;
-
-    ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
-        Amount<Long, Time> shutdownGracePeriod) {
-      scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
-      shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination(
-          scheduledExecutor,
-          shutdownGracePeriod.getValue(),
-          shutdownGracePeriod.getUnit().getTimeUnit()));
-    }
-
-    @Override
-    public void doEvery(Amount<Long, Time> interval, Runnable action) {
-      requireNonNull(interval);
-      requireNonNull(action);
-
-      long delay = interval.getValue();
-      TimeUnit timeUnit = interval.getUnit().getTimeUnit();
-      scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
-    }
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(LogStorage.class);
-
-  private final LogManager logManager;
-  private final SchedulingService schedulingService;
-  private final SnapshotStore<Snapshot> snapshotStore;
-  private final Amount<Long, Time> snapshotInterval;
-  private final Storage writeBehindStorage;
-  private final SchedulerStore.Mutable writeBehindSchedulerStore;
-  private final CronJobStore.Mutable writeBehindJobStore;
-  private final TaskStore.Mutable writeBehindTaskStore;
-  private final QuotaStore.Mutable writeBehindQuotaStore;
-  private final AttributeStore.Mutable writeBehindAttributeStore;
-  private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
-  private final ReentrantLock writeLock;
-  private final ThriftBackfill thriftBackfill;
-
-  private StreamManager streamManager;
-  private final WriteAheadStorage writeAheadStorage;
-
-  // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
-  // recovering are controlled at this layer (they're all calls to Mutable store implementations).
-  // The more involved change is changing SnapshotStore to accept a Mutable store provider to
-  // avoid a call to Storage.write() when we replay a Snapshot.
-  private boolean recovered = false;
-  private StreamTransaction transaction = null;
-
-  private final SlidingStats writerWaitStats =
-      new SlidingStats("log_storage_write_lock_wait", "ns");
-
-  private final Map<LogEntry._Fields, Consumer<LogEntry>> logEntryReplayActions;
-  private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
-
-  @Inject
-  LogStorage(
-      LogManager logManager,
-      ShutdownRegistry shutdownRegistry,
-      Settings settings,
-      SnapshotStore<Snapshot> snapshotStore,
-      @Volatile Storage storage,
-      @Volatile SchedulerStore.Mutable schedulerStore,
-      @Volatile CronJobStore.Mutable jobStore,
-      @Volatile TaskStore.Mutable taskStore,
-      @Volatile QuotaStore.Mutable quotaStore,
-      @Volatile AttributeStore.Mutable attributeStore,
-      @Volatile JobUpdateStore.Mutable jobUpdateStore,
-      EventSink eventSink,
-      ReentrantLock writeLock,
-      ThriftBackfill thriftBackfill) {
-
-    this(logManager,
-        new ScheduledExecutorSchedulingService(shutdownRegistry, settings.getShutdownGracePeriod()),
-        snapshotStore,
-        settings.getSnapshotInterval(),
-        storage,
-        schedulerStore,
-        jobStore,
-        taskStore,
-        quotaStore,
-        attributeStore,
-        jobUpdateStore,
-        eventSink,
-        writeLock,
-        thriftBackfill);
-  }
-
-  @VisibleForTesting
-  LogStorage(
-      LogManager logManager,
-      SchedulingService schedulingService,
-      SnapshotStore<Snapshot> snapshotStore,
-      Amount<Long, Time> snapshotInterval,
-      Storage delegateStorage,
-      SchedulerStore.Mutable schedulerStore,
-      CronJobStore.Mutable jobStore,
-      TaskStore.Mutable taskStore,
-      QuotaStore.Mutable quotaStore,
-      AttributeStore.Mutable attributeStore,
-      JobUpdateStore.Mutable jobUpdateStore,
-      EventSink eventSink,
-      ReentrantLock writeLock,
-      ThriftBackfill thriftBackfill) {
-
-    this.logManager = requireNonNull(logManager);
-    this.schedulingService = requireNonNull(schedulingService);
-    this.snapshotStore = requireNonNull(snapshotStore);
-    this.snapshotInterval = requireNonNull(snapshotInterval);
-
-    // Log storage has two distinct operating modes: pre- and post-recovery.  When recovering,
-    // we write directly to the writeBehind stores since we are replaying what's already persisted.
-    // After that, all writes must succeed in the distributed log before they may be considered
-    // successful.
-    this.writeBehindStorage = requireNonNull(delegateStorage);
-    this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
-    this.writeBehindJobStore = requireNonNull(jobStore);
-    this.writeBehindTaskStore = requireNonNull(taskStore);
-    this.writeBehindQuotaStore = requireNonNull(quotaStore);
-    this.writeBehindAttributeStore = requireNonNull(attributeStore);
-    this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
-    this.writeLock = requireNonNull(writeLock);
-    this.thriftBackfill = requireNonNull(thriftBackfill);
-    TransactionManager transactionManager = new TransactionManager() {
-      @Override
-      public boolean hasActiveTransaction() {
-        return transaction != null;
-      }
-
-      @Override
-      public void log(Op op) {
-        transaction.add(op);
-      }
-    };
-    this.writeAheadStorage = new WriteAheadStorage(
-        transactionManager,
-        schedulerStore,
-        jobStore,
-        taskStore,
-        quotaStore,
-        attributeStore,
-        jobUpdateStore,
-        LoggerFactory.getLogger(WriteAheadStorage.class),
-        eventSink);
-
-    this.logEntryReplayActions = buildLogEntryReplayActions();
-    this.transactionReplayActions = buildTransactionReplayActions();
-  }
-
-  @VisibleForTesting
-  final Map<LogEntry._Fields, Consumer<LogEntry>> buildLogEntryReplayActions() {
-    return ImmutableMap.<LogEntry._Fields, Consumer<LogEntry>>builder()
-        .put(LogEntry._Fields.SNAPSHOT, logEntry -> {
-          Snapshot snapshot = logEntry.getSnapshot();
-          LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
-          snapshotStore.applySnapshot(snapshot);
-        })
-        .put(LogEntry._Fields.TRANSACTION, logEntry -> write((NoResult.Quiet) unused -> {
-          for (Op op : logEntry.getTransaction().getOps()) {
-            replayOp(op);
-          }
-        }))
-        .put(LogEntry._Fields.NOOP, item -> {
-          // Nothing to do here
-        })
-        .build();
-  }
-
-  @VisibleForTesting
-  final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
-    return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
-        .put(
-            Op._Fields.SAVE_FRAMEWORK_ID,
-            op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
-        .put(Op._Fields.SAVE_CRON_JOB, op -> {
-          SaveCronJob cronJob = op.getSaveCronJob();
-          writeBehindJobStore.saveAcceptedJob(
-              thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig()));
-        })
-        .put(
-            Op._Fields.REMOVE_JOB,
-            op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
-        .put(
-            Op._Fields.SAVE_TASKS,
-            op -> writeBehindTaskStore.saveTasks(
-                thriftBackfill.backfillTasks(op.getSaveTasks().getTasks())))
-        .put(
-            Op._Fields.REMOVE_TASKS,
-            op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
-        .put(Op._Fields.SAVE_QUOTA, op -> {
-          SaveQuota saveQuota = op.getSaveQuota();
-          writeBehindQuotaStore.saveQuota(
-              saveQuota.getRole(),
-              ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
-        })
-        .put(
-            Op._Fields.REMOVE_QUOTA,
-            op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
-        .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
-          HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
-          // Prior to commit 5cf760b, the store would persist maintenance mode changes for
-          // unknown hosts.  5cf760b began rejecting these, but the replicated log may still
-          // contain entries with a null slave ID.
-          if (attributes.isSetSlaveId()) {
-            writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
-          } else {
-            LOG.info("Dropping host attributes with no agent ID: " + attributes);
-          }
-        })
-        .put(Op._Fields.SAVE_JOB_UPDATE, op ->
-          writeBehindJobUpdateStore.saveJobUpdate(
-              thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
-        .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
-          SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
-          writeBehindJobUpdateStore.saveJobUpdateEvent(
-              IJobUpdateKey.build(event.getKey()),
-              IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
-        })
-        .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
-          SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
-          writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
-              IJobUpdateKey.build(event.getKey()),
-              IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
-        })
-        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
-          LOG.info("Dropping prune operation.  Updates will be pruned later.");
-        })
-        .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
-          writeBehindJobUpdateStore.removeJobUpdates(
-              IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
-        .build();
-  }
-
-  @Override
-  @Timed("scheduler_storage_prepare")
-  public synchronized void prepare() {
-    writeBehindStorage.prepare();
-    // Open the log to make a log replica available to the scheduler group.
-    try {
-      streamManager = logManager.open();
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to open the log, cannot continue", e);
-    }
-  }
-
-  @Override
-  @Timed("scheduler_storage_start")
-  public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
-    write((NoResult.Quiet) unused -> {
-      // Must have the underlying storage started so we can query it for the last checkpoint.
-      // We replay these entries in the forwarded storage system's transactions but not ours - we
-      // do not want to re-record these ops to the log.
-      recover();
-      recovered = true;
-
-      // Now that we're recovered we should let any mutations done in initializationLogic append
-      // to the log, so run it in one of our transactions.
-      write(initializationLogic);
-    });
-
-    scheduleSnapshots();
-  }
-
-  @Override
-  public void stop() {
-    // No-op.
-  }
-
-  @Timed("scheduler_log_recover")
-  void recover() throws RecoveryFailedException {
-    try {
-      streamManager.readFromBeginning(LogStorage.this::replay);
-    } catch (CodingException | InvalidPositionException | StreamAccessException e) {
-      throw new RecoveryFailedException(e);
-    }
-  }
-
-  private static final class RecoveryFailedException extends SchedulerException {
-    RecoveryFailedException(Throwable cause) {
-      super(cause);
-    }
-  }
-
-  private void replay(final LogEntry logEntry) {
-    LogEntry._Fields entryField = logEntry.getSetField();
-    if (!logEntryReplayActions.containsKey(entryField)) {
-      throw new IllegalStateException("Unknown log entry type: " + entryField);
-    }
-
-    logEntryReplayActions.get(entryField).accept(logEntry);
-  }
-
-  private void replayOp(Op op) {
-    Op._Fields opField = op.getSetField();
-    if (!transactionReplayActions.containsKey(opField)) {
-      throw new IllegalStateException("Unknown transaction op: " + opField);
-    }
-
-    transactionReplayActions.get(opField).accept(op);
-  }
-
-  private void scheduleSnapshots() {
-    if (snapshotInterval.getValue() > 0) {
-      schedulingService.doEvery(snapshotInterval, () -> {
-        try {
-          snapshot();
-        } catch (StorageException e) {
-          if (e.getCause() == null) {
-            LOG.warn("StorageException when attempting to snapshot.", e);
-          } else {
-            LOG.warn(e.getMessage(), e.getCause());
-          }
-        }
-      });
-    }
-  }
-
-  /**
-   * Forces a snapshot of the storage state.
-   *
-   * @throws CodingException If there is a problem encoding the snapshot.
-   * @throws InvalidPositionException If the log stream cursor is invalid.
-   * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
-   */
-  @Timed("scheduler_log_snapshot")
-  void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
-    write((NoResult<CodingException>) (MutableStoreProvider unused) -> {
-      LOG.info("Creating snapshot.");
-      Snapshot snapshot = snapshotStore.createSnapshot();
-      persist(snapshot);
-      LOG.info("Snapshot complete."
-          + " host attrs: " + snapshot.getHostAttributesSize()
-          + ", cron jobs: " + snapshot.getCronJobsSize()
-          + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
-          + ", tasks: " + snapshot.getTasksSize()
-          + ", updates: " + snapshot.getJobUpdateDetailsSize());
-    });
-  }
-
-  @Timed("scheduler_log_snapshot_persist")
-  @Override
-  public void persist(Snapshot snapshot)
-      throws CodingException, InvalidPositionException, StreamAccessException {
-
-    streamManager.snapshot(snapshot);
-  }
-
-  private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
-      throws StorageException, E {
-
-    // The log stream transaction has already been set up so we just need to delegate with our
-    // store provider so any mutations performed by work get logged.
-    if (transaction != null) {
-      return work.apply(writeAheadStorage);
-    }
-
-    transaction = streamManager.startTransaction();
-    try {
-      return writeBehindStorage.write(unused -> {
-        T result = work.apply(writeAheadStorage);
-        try {
-          transaction.commit();
-        } catch (CodingException e) {
-          throw new IllegalStateException(
-              "Problem encoding transaction operations to the log stream", e);
-        } catch (StreamAccessException e) {
-          throw new StorageException(
-              "There was a problem committing the transaction to the log.", e);
-        }
-        return result;
-      });
-    } finally {
-      transaction = null;
-    }
-  }
-
-  @Override
-  public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
-    long waitStart = System.nanoTime();
-    writeLock.lock();
-    try {
-      writerWaitStats.accumulate(System.nanoTime() - waitStart);
-      // We don't want to use the log when recovering from it, we just want to update the underlying
-      // store - so pass mutations straight through to the underlying storage.
-      if (!recovered) {
-        return writeBehindStorage.write(work);
-      }
-
-      return doInTransaction(work);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
-    return writeBehindStorage.read(work);
-  }
-
-  @Override
-  public void snapshot() throws StorageException {
-    try {
-      doSnapshot();
-    } catch (CodingException e) {
-      throw new StorageException("Failed to encode a snapshot", e);
-    } catch (InvalidPositionException e) {
-      throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
-    } catch (StreamAccessException e) {
-      throw new StorageException("Failed to create a snapshot", e);
-    }
-  }
-
-  /**
-   * Configuration settings for log storage.
-   */
-  public static class Settings {
-    private final Amount<Long, Time> shutdownGracePeriod;
-    private final Amount<Long, Time> snapshotInterval;
-
-    public Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
-      this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
-      this.snapshotInterval = requireNonNull(snapshotInterval);
-    }
-
-    public Amount<Long, Time> getShutdownGracePeriod() {
-      return shutdownGracePeriod;
-    }
-
-    public Amount<Long, Time> getSnapshotInterval() {
-      return snapshotInterval;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index c8dc7ad..75ec42a 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -32,8 +32,10 @@ import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
 import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.LogStorage.Settings;
+import org.apache.aurora.scheduler.storage.log.LogPersistence.Settings;
 
 import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
 import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
@@ -77,10 +79,13 @@ public class LogStorageModule extends PrivateModule {
     bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
         .toInstance(options.maxLogEntrySize);
     bind(LogManager.class).in(Singleton.class);
-    bind(LogStorage.class).in(Singleton.class);
+    bind(DurableStorage.class).in(Singleton.class);
 
-    install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
-    bind(DistributedSnapshotStore.class).to(LogStorage.class);
+    install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
+    bind(LogPersistence.class).in(Singleton.class);
+    bind(Persistence.class).to(LogPersistence.class);
+    bind(DistributedSnapshotStore.class).to(LogPersistence.class);
+    expose(Persistence.class);
     expose(Storage.class);
     expose(NonVolatileStorage.class);
     expose(DistributedSnapshotStore.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 5859f80..739fad7 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -48,6 +48,7 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
index ea147c0..18da32d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
@@ -13,7 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage.log;
 
-import java.util.function.Consumer;
+import java.util.Iterator;
 
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Snapshot;
@@ -25,23 +25,21 @@ import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
 
 /**
  * Manages interaction with the log stream.  Log entries can be
- * {@link #readFromBeginning(Consumer) read from} the beginning,
+ * {@link #readFromBeginning() read from} the beginning,
  * a {@link #startTransaction() transaction} consisting of one or more local storage
  * operations can be committed atomically, or the log can be compacted by
  * {@link #snapshot(org.apache.aurora.gen.storage.Snapshot) snapshotting}.
  */
 public interface StreamManager {
   /**
-   * Reads all entries in the log stream after the given position.  If the position
-   * supplied is {@code null} then all log entries in the stream will be read.
+   * Reads all entries in the log stream.
    *
-   * @param reader A reader that will be handed log entries decoded from the stream.
+   * @return All stored log entries.
    * @throws CodingException if there was a problem decoding a log entry from the stream.
    * @throws InvalidPositionException if the given position is not found in the log.
    * @throws StreamAccessException if there is a problem reading from the log.
    */
-  void readFromBeginning(Consumer<LogEntry> reader)
-      throws CodingException, InvalidPositionException, StreamAccessException;
+  Iterator<LogEntry> readFromBeginning() throws CodingException, StreamAccessException;
 
   /**
    * Truncates all entries in the log stream occuring before the given position.  The entry at the
@@ -54,8 +52,7 @@ public interface StreamManager {
   void truncateBefore(Log.Position position);
 
   /**
-   * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream
-   * atomically.
+   * Starts a transaction that can be used to commit a series of ops to the log stream atomically.
    *
    * @return StreamTransaction A transaction manager to handle batching up commits to the
    *    underlying stream.

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index baf2647..c5b107f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -19,12 +19,12 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
 
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -95,31 +95,37 @@ class StreamManagerImpl implements StreamManager {
   }
 
   @Override
-  public void readFromBeginning(Consumer<LogEntry> reader)
+  public Iterator<LogEntry> readFromBeginning()
       throws CodingException, InvalidPositionException, StreamAccessException {
 
     Iterator<Log.Entry> entries = stream.readAll();
 
-    while (entries.hasNext()) {
-      LogEntry logEntry = decodeLogEntry(entries.next());
-      while (logEntry != null && isFrame(logEntry)) {
-        logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
-      }
-      if (logEntry != null) {
-        if (logEntry.isSet(LogEntry._Fields.DEFLATED_ENTRY)) {
-          logEntry = Entries.inflate(logEntry);
-          vars.deflatedEntriesRead.incrementAndGet();
-        }
-
-        if (logEntry.isSetDeduplicatedSnapshot()) {
-          logEntry = LogEntry.snapshot(
-              snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot()));
+    return new AbstractIterator<LogEntry>() {
+      @Override
+      protected LogEntry computeNext() {
+        while (entries.hasNext()) {
+          LogEntry logEntry = decodeLogEntry(entries.next());
+          while (logEntry != null && isFrame(logEntry)) {
+            logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
+          }
+          if (logEntry != null) {
+            if (logEntry.isSet(LogEntry._Fields.DEFLATED_ENTRY)) {
+              logEntry = Entries.inflate(logEntry);
+              vars.deflatedEntriesRead.incrementAndGet();
+            }
+
+            if (logEntry.isSetDeduplicatedSnapshot()) {
+              logEntry = LogEntry.snapshot(
+                  snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot()));
+            }
+
+            vars.entriesRead.incrementAndGet();
+            return logEntry;
+          }
         }
-
-        reader.accept(logEntry);
-        vars.entriesRead.incrementAndGet();
+        return endOfData();
       }
-    }
+    };
   }
 
   @Nullable

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
deleted file mode 100644
index 92b64bb..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import com.google.inject.Inject;
-
-import org.apache.aurora.GuavaUtils;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateInstructions;
-import org.apache.aurora.gen.Resource;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.TierInfo;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.resources.ResourceType;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IResource;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.lang.String.format;
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
-import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
-import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
-
-/**
- * Helps migrating thrift schema by populating deprecated and/or replacement fields.
- */
-public final class ThriftBackfill {
-
-  private final TierManager tierManager;
-
-  @Inject
-  public ThriftBackfill(TierManager tierManager) {
-    this.tierManager = requireNonNull(tierManager);
-  }
-
-  private static Resource getResource(Set<Resource> resources, ResourceType type) {
-    return resources.stream()
-            .filter(e -> ResourceType.fromResource(IResource.build(e)).equals(type))
-            .findFirst()
-            .orElseThrow(() ->
-                    new IllegalArgumentException("Missing resource definition for " + type));
-  }
-
-  /**
-   * Ensures TaskConfig.resources and correspondent task-level fields are all populated.
-   *
-   * @param config TaskConfig to backfill.
-   * @return Backfilled TaskConfig.
-   */
-  public TaskConfig backfillTask(TaskConfig config) {
-    backfillTier(config);
-    return config;
-  }
-
-  private void backfillTier(TaskConfig config) {
-    ITaskConfig taskConfig = ITaskConfig.build(config);
-    if (config.isSetTier()) {
-      TierInfo tier = tierManager.getTier(taskConfig);
-      config.setProduction(!tier.isPreemptible() && !tier.isRevocable());
-    } else {
-      config.setTier(tierManager.getTiers()
-          .entrySet()
-          .stream()
-          .filter(e -> e.getValue().isPreemptible() == !taskConfig.isProduction()
-              && !e.getValue().isRevocable())
-          .findFirst()
-          .orElseThrow(() -> new IllegalStateException(
-              format("No matching implicit tier for task of job %s", taskConfig.getJob())))
-          .getKey());
-    }
-  }
-
-  /**
-   * Backfills JobConfiguration. See {@link #backfillTask(TaskConfig)}.
-   *
-   * @param jobConfig JobConfiguration to backfill.
-   * @return Backfilled JobConfiguration.
-   */
-  public IJobConfiguration backfillJobConfiguration(JobConfiguration jobConfig) {
-    backfillTask(jobConfig.getTaskConfig());
-    return IJobConfiguration.build(jobConfig);
-  }
-
-  /**
-   * Backfills set of tasks. See {@link #backfillTask(TaskConfig)}.
-   *
-   * @param tasks Set of tasks to backfill.
-   * @return Backfilled set of tasks.
-   */
-  public Set<IScheduledTask> backfillTasks(Set<ScheduledTask> tasks) {
-    return tasks.stream()
-        .map(t -> backfillScheduledTask(t))
-        .map(IScheduledTask::build)
-        .collect(GuavaUtils.toImmutableSet());
-  }
-
-  /**
-   * Ensures ResourceAggregate.resources and correspondent deprecated fields are all populated.
-   *
-   * @param aggregate ResourceAggregate to backfill.
-   * @return Backfilled IResourceAggregate.
-   */
-  public static IResourceAggregate backfillResourceAggregate(ResourceAggregate aggregate) {
-    if (!aggregate.isSetResources() || aggregate.getResources().isEmpty()) {
-      aggregate.addToResources(Resource.numCpus(aggregate.getNumCpus()));
-      aggregate.addToResources(Resource.ramMb(aggregate.getRamMb()));
-      aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb()));
-    } else {
-      EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES;
-      if (aggregate.getResources().size() > quotaResources.size()) {
-        throw new IllegalArgumentException("Too many resource values in quota.");
-      }
-
-      if (!quotaResources.equals(aggregate.getResources().stream()
-              .map(e -> ResourceType.fromResource(IResource.build(e)))
-              .collect(Collectors.toSet()))) {
-
-        throw new IllegalArgumentException("Quota resources must be exactly: " + quotaResources);
-      }
-      aggregate.setNumCpus(
-              getResource(aggregate.getResources(), CPUS).getNumCpus());
-      aggregate.setRamMb(
-              getResource(aggregate.getResources(), RAM_MB).getRamMb());
-      aggregate.setDiskMb(
-              getResource(aggregate.getResources(), DISK_MB).getDiskMb());
-    }
-    return IResourceAggregate.build(aggregate);
-  }
-
-  private ScheduledTask backfillScheduledTask(ScheduledTask task) {
-    backfillTask(task.getAssignedTask().getTask());
-    return task;
-  }
-
-  /**
-   * Backfills JobUpdate. See {@link #backfillTask(TaskConfig)}.
-   *
-   * @param update JobUpdate to backfill.
-   * @return Backfilled job update.
-   */
-  IJobUpdate backFillJobUpdate(JobUpdate update) {
-    JobUpdateInstructions instructions = update.getInstructions();
-    if (instructions.isSetDesiredState()) {
-      backfillTask(instructions.getDesiredState().getTask());
-    }
-
-    instructions.getInitialState().forEach(e -> backfillTask(e.getTask()));
-
-    return IJobUpdate.build(update);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
deleted file mode 100644
index 41061f8..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.slf4j.Logger;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.storage.log.LogStorage.TransactionManager;
-
-/**
- * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
- * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
- * stores.
- */
-class WriteAheadStorage implements
-    MutableStoreProvider,
-    SchedulerStore.Mutable,
-    CronJobStore.Mutable,
-    TaskStore.Mutable,
-    QuotaStore.Mutable,
-    AttributeStore.Mutable,
-    JobUpdateStore.Mutable {
-
-  private final TransactionManager transactionManager;
-  private final SchedulerStore.Mutable schedulerStore;
-  private final CronJobStore.Mutable jobStore;
-  private final TaskStore.Mutable taskStore;
-  private final QuotaStore.Mutable quotaStore;
-  private final AttributeStore.Mutable attributeStore;
-  private final JobUpdateStore.Mutable jobUpdateStore;
-  private final Logger log;
-  private final EventSink eventSink;
-
-  /**
-   * Creates a new write-ahead storage that delegates to the providing default stores.
-   *
-   * @param transactionManager External controller for transaction operations.
-   * @param schedulerStore Delegate.
-   * @param jobStore       Delegate.
-   * @param taskStore      Delegate.
-   * @param quotaStore     Delegate.
-   * @param attributeStore Delegate.
-   * @param jobUpdateStore Delegate.
-   */
-  WriteAheadStorage(
-      TransactionManager transactionManager,
-      SchedulerStore.Mutable schedulerStore,
-      CronJobStore.Mutable jobStore,
-      TaskStore.Mutable taskStore,
-      QuotaStore.Mutable quotaStore,
-      AttributeStore.Mutable attributeStore,
-      JobUpdateStore.Mutable jobUpdateStore,
-      Logger log,
-      EventSink eventSink) {
-
-    this.transactionManager = requireNonNull(transactionManager);
-    this.schedulerStore = requireNonNull(schedulerStore);
-    this.jobStore = requireNonNull(jobStore);
-    this.taskStore = requireNonNull(taskStore);
-    this.quotaStore = requireNonNull(quotaStore);
-    this.attributeStore = requireNonNull(attributeStore);
-    this.jobUpdateStore = requireNonNull(jobUpdateStore);
-    this.log = requireNonNull(log);
-    this.eventSink = requireNonNull(eventSink);
-  }
-
-  private void write(Op op) {
-    Preconditions.checkState(
-        transactionManager.hasActiveTransaction(),
-        "Mutating operations must be within a transaction.");
-    transactionManager.log(op);
-  }
-
-  @Override
-  public void saveFrameworkId(final String frameworkId) {
-    requireNonNull(frameworkId);
-
-    write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
-    schedulerStore.saveFrameworkId(frameworkId);
-  }
-
-  @Override
-  public void deleteTasks(final Set<String> taskIds) {
-    requireNonNull(taskIds);
-
-    write(Op.removeTasks(new RemoveTasks(taskIds)));
-    taskStore.deleteTasks(taskIds);
-  }
-
-  @Override
-  public void saveTasks(final Set<IScheduledTask> newTasks) {
-    requireNonNull(newTasks);
-
-    write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
-    taskStore.saveTasks(newTasks);
-  }
-
-  @Override
-  public Optional<IScheduledTask> mutateTask(
-      String taskId,
-      Function<IScheduledTask, IScheduledTask> mutator) {
-
-    Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
-    log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
-    write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
-    return mutated;
-  }
-
-  @Override
-  public void saveQuota(final String role, final IResourceAggregate quota) {
-    requireNonNull(role);
-    requireNonNull(quota);
-
-    write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
-    quotaStore.saveQuota(role, quota);
-  }
-
-  @Override
-  public boolean saveHostAttributes(final IHostAttributes attrs) {
-    requireNonNull(attrs);
-
-    boolean changed = attributeStore.saveHostAttributes(attrs);
-    if (changed) {
-      write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
-      eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
-    }
-    return changed;
-  }
-
-  @Override
-  public void removeJob(final IJobKey jobKey) {
-    requireNonNull(jobKey);
-
-    write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
-    jobStore.removeJob(jobKey);
-  }
-
-  @Override
-  public void saveAcceptedJob(final IJobConfiguration jobConfig) {
-    requireNonNull(jobConfig);
-
-    write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
-    jobStore.saveAcceptedJob(jobConfig);
-  }
-
-  @Override
-  public void removeQuota(final String role) {
-    requireNonNull(role);
-
-    write(Op.removeQuota(new RemoveQuota(role)));
-    quotaStore.removeQuota(role);
-  }
-
-  @Override
-  public void saveJobUpdate(IJobUpdate update) {
-    requireNonNull(update);
-
-    write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
-    jobUpdateStore.saveJobUpdate(update);
-  }
-
-  @Override
-  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
-    requireNonNull(key);
-    requireNonNull(event);
-
-    write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
-    jobUpdateStore.saveJobUpdateEvent(key, event);
-  }
-
-  @Override
-  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
-    requireNonNull(key);
-    requireNonNull(event);
-
-    write(Op.saveJobInstanceUpdateEvent(
-        new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
-    jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
-  }
-
-  @Override
-  public void removeJobUpdates(Set<IJobUpdateKey> keys) {
-    requireNonNull(keys);
-
-    // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
-    // read it.  JobUpdates are only removed implicitly when a snapshot is taken.
-    jobUpdateStore.removeJobUpdates(keys);
-  }
-
-  @Override
-  public void deleteAllTasks() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteHostAttributes() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteJobs() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteQuotas() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public void deleteAllUpdates() {
-    throw new UnsupportedOperationException(
-        "Unsupported since casual storage users should never be doing this.");
-  }
-
-  @Override
-  public SchedulerStore.Mutable getSchedulerStore() {
-    return this;
-  }
-
-  @Override
-  public CronJobStore.Mutable getCronJobStore() {
-    return this;
-  }
-
-  @Override
-  public TaskStore.Mutable getUnsafeTaskStore() {
-    return this;
-  }
-
-  @Override
-  public QuotaStore.Mutable getQuotaStore() {
-    return this;
-  }
-
-  @Override
-  public AttributeStore.Mutable getAttributeStore() {
-    return this;
-  }
-
-  @Override
-  public TaskStore getTaskStore() {
-    return this;
-  }
-
-  @Override
-  public JobUpdateStore.Mutable getJobUpdateStore() {
-    return this;
-  }
-
-  @Override
-  public Optional<String> fetchFrameworkId() {
-    return this.schedulerStore.fetchFrameworkId();
-  }
-
-  @Override
-  public Iterable<IJobConfiguration> fetchJobs() {
-    return this.jobStore.fetchJobs();
-  }
-
-  @Override
-  public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
-    return this.jobStore.fetchJob(jobKey);
-  }
-
-  @Override
-  public Optional<IScheduledTask> fetchTask(String taskId) {
-    return this.taskStore.fetchTask(taskId);
-  }
-
-  @Override
-  public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
-    return this.taskStore.fetchTasks(query);
-  }
-
-  @Override
-  public Set<IJobKey> getJobKeys() {
-    return this.taskStore.getJobKeys();
-  }
-
-  @Override
-  public Optional<IResourceAggregate> fetchQuota(String role) {
-    return this.quotaStore.fetchQuota(role);
-  }
-
-  @Override
-  public Map<String, IResourceAggregate> fetchQuotas() {
-    return this.quotaStore.fetchQuotas();
-  }
-
-  @Override
-  public Optional<IHostAttributes> getHostAttributes(String host) {
-    return this.attributeStore.getHostAttributes(host);
-  }
-
-  @Override
-  public Set<IHostAttributes> getHostAttributes() {
-    return this.attributeStore.getHostAttributes();
-  }
-
-  @Override
-  public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
-    return this.jobUpdateStore.fetchJobUpdates(query);
-  }
-
-  @Override
-  public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
-    return this.jobUpdateStore.fetchJobUpdate(key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 2cc567d..a519b07 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -83,11 +83,13 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IHostStatus;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -99,7 +101,6 @@ import org.apache.aurora.scheduler.storage.entities.IMetadata;
 import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 import org.apache.aurora.scheduler.thrift.aop.AnnotatedAuroraAdmin;
 import org.apache.aurora.scheduler.thrift.aop.ThriftWorkload;
 import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift;
@@ -167,6 +168,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
   private final ConfigurationManager configurationManager;
   private final Thresholds thresholds;
   private final NonVolatileStorage storage;
+  private final DistributedSnapshotStore snapshotStore;
   private final StorageBackup backup;
   private final Recovery recovery;
   private final MaintenanceController maintenance;
@@ -195,6 +197,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
       ConfigurationManager configurationManager,
       Thresholds thresholds,
       NonVolatileStorage storage,
+      DistributedSnapshotStore snapshotStore,
       StorageBackup backup,
       Recovery recovery,
       CronJobManager cronJobManager,
@@ -211,6 +214,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
     this.configurationManager = requireNonNull(configurationManager);
     this.thresholds = requireNonNull(thresholds);
     this.storage = requireNonNull(storage);
+    this.snapshotStore = requireNonNull(snapshotStore);
     this.backup = requireNonNull(backup);
     this.recovery = requireNonNull(recovery);
     this.maintenance = requireNonNull(maintenance);
@@ -635,7 +639,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
 
   @Override
   public Response snapshot() {
-    storage.snapshot();
+    snapshotStore.snapshot();
     return ok();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
index 8cf6871..e82b637 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
@@ -36,11 +36,6 @@ public class FakeNonVolatileStorage implements NonVolatileStorage {
   }
 
   @Override
-  public void snapshot() throws StorageException {
-    // No-op.
-  }
-
-  @Override
   public void start(Quiet initializationLogic) throws StorageException {
     // No-op.
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index c639ab6..aeb8685 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -26,6 +26,7 @@ import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.util.Modules;
 
+import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.app.SchedulerMain;
 import org.apache.aurora.scheduler.app.local.simulator.ClusterSimulatorModule;
@@ -82,7 +83,17 @@ public final class LocalSchedulerMain {
       protected void configure() {
         bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class));
         bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
-        bind(DistributedSnapshotStore.class).toInstance(snapshot -> { });
+        bind(DistributedSnapshotStore.class).toInstance(new DistributedSnapshotStore() {
+          @Override
+          public void snapshot() throws Storage.StorageException {
+            // no-op
+          }
+
+          @Override
+          public void snapshotWith(Snapshot snapshot) {
+            // no-op
+          }
+        });
       }
     };
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
index 7138d6b..09560f4 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
@@ -98,7 +98,7 @@ public class RecoveryTest extends EasyMockTest {
     Capture<MutateWork<Object, Exception>> transaction = createCapture();
     expect(primaryStorage.write(capture(transaction))).andReturn(null);
     Capture<Snapshot> snapshot = createCapture();
-    distributedStore.persist(capture(snapshot));
+    distributedStore.snapshotWith(capture(snapshot));
     shutDownNow.execute();
 
     control.replay();
@@ -127,7 +127,7 @@ public class RecoveryTest extends EasyMockTest {
     Capture<MutateWork<Object, Exception>> transaction = createCapture();
     expect(primaryStorage.write(capture(transaction))).andReturn(null);
     Capture<Snapshot> snapshot = createCapture();
-    distributedStore.persist(capture(snapshot));
+    distributedStore.snapshotWith(capture(snapshot));
     shutDownNow.execute();
 
     control.replay();


[4/4] aurora git commit: Extract a storage Persistence layer

Posted by wf...@apache.org.
Extract a storage Persistence layer

This extracts the `Log`- and `Snapshot`-specific details from `LogStorage`,
leaving `DurableStorage`.  `DurableStorage` is useful as a general-purpose
`Storage` mutation observer, with `Persistence` being the minimal behavior
needed for an underlying durability layer to provide.

Reviewed at https://reviews.apache.org/r/64234/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/cea43db9
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/cea43db9
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/cea43db9

Branch: refs/heads/master
Commit: cea43db9ded1201f69a85a43fb67244c69cf5347
Parents: de8b375
Author: Bill Farner <wf...@apache.org>
Authored: Sat Dec 2 19:59:03 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Sat Dec 2 19:59:03 2017 -0800

----------------------------------------------------------------------
 .../apache/aurora/codec/ThriftBinaryCodec.java  |   2 +-
 .../aurora/scheduler/base/TaskTestUtil.java     |   2 +-
 .../configuration/ConfigurationManager.java     |   2 +-
 .../scheduler/resources/ResourceManager.java    |   2 +-
 .../storage/CallOrderEnforcingStorage.java      |   6 -
 .../storage/DistributedSnapshotStore.java       |  15 +-
 .../aurora/scheduler/storage/Storage.java       |  10 -
 .../scheduler/storage/backup/Recovery.java      |   2 +-
 .../storage/backup/TemporaryStorage.java        |   2 +-
 .../storage/durability/DurableStorage.java      | 350 ++++++++
 .../storage/durability/Persistence.java         |  64 ++
 .../storage/durability/ThriftBackfill.java      | 175 ++++
 .../storage/durability/TransactionRecorder.java | 122 +++
 .../storage/durability/WriteAheadStorage.java   | 368 ++++++++
 .../scheduler/storage/log/LogPersistence.java   | 257 ++++++
 .../scheduler/storage/log/LogStorage.java       | 576 ------------
 .../scheduler/storage/log/LogStorageModule.java |  13 +-
 .../storage/log/SnapshotStoreImpl.java          |   1 +
 .../scheduler/storage/log/StreamManager.java    |  15 +-
 .../storage/log/StreamManagerImpl.java          |  46 +-
 .../scheduler/storage/log/ThriftBackfill.java   | 175 ----
 .../storage/log/WriteAheadStorage.java          | 369 --------
 .../thrift/SchedulerThriftInterface.java        |   8 +-
 .../app/local/FakeNonVolatileStorage.java       |   5 -
 .../scheduler/app/local/LocalSchedulerMain.java |  13 +-
 .../scheduler/storage/backup/RecoveryTest.java  |   4 +-
 .../storage/durability/DurableStorageTest.java  | 781 ++++++++++++++++
 .../storage/durability/ThriftBackfillTest.java  | 222 +++++
 .../durability/TransactionRecorderTest.java     |  78 ++
 .../durability/WriteAheadStorageTest.java       | 166 ++++
 .../scheduler/storage/log/LogManagerTest.java   |  86 +-
 .../scheduler/storage/log/LogStorageTest.java   | 897 -------------------
 .../storage/log/NonVolatileStorageTest.java     |   5 +-
 .../storage/log/SnapshotStoreImplIT.java        |   1 +
 .../storage/log/ThriftBackfillTest.java         | 222 -----
 .../storage/log/WriteAheadStorageTest.java      | 165 ----
 .../thrift/SchedulerThriftInterfaceTest.java    |   8 +-
 .../aurora/scheduler/thrift/ThriftIT.java       |   2 +
 38 files changed, 2687 insertions(+), 2550 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
index 3c12532..cdbe359 100644
--- a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
+++ b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
@@ -217,7 +217,7 @@ public final class ThriftBinaryCodec {
   /**
    * Thrown when serialization or deserialization failed.
    */
-  public static class CodingException extends Exception {
+  public static class CodingException extends RuntimeException {
     public CodingException(String message) {
       super(message);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 5fe7b9b..e1f20f4 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -47,10 +47,10 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 import org.apache.mesos.v1.Protos;
 import org.apache.mesos.v1.Protos.ExecutorID;
 import org.apache.mesos.v1.Protos.ExecutorInfo;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index fa2f39c..f3e98f2 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -40,6 +40,7 @@ import org.apache.aurora.scheduler.base.UserProvidedStrings;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IContainer;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -48,7 +49,6 @@ import org.apache.aurora.scheduler.storage.entities.IResource;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
 import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 
 import static java.util.Objects.requireNonNull;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index f9dee22..d093753 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -26,12 +26,12 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IResource;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 import org.apache.mesos.v1.Protos.Resource;
 
 import static org.apache.aurora.scheduler.resources.ResourceType.BY_MESOS_NAME;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index 1b10ec5..25fd315 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -132,12 +132,6 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
     return wrapped.write(work);
   }
 
-  @Override
-  public void snapshot() throws StorageException {
-    checkState(State.READY);
-    wrapped.snapshot();
-  }
-
   /**
    * Creates a binding module that will wrap a storage class with {@link CallOrderEnforcingStorage},
    * exposing the order-enforced storage as {@link Storage} and {@link NonVolatileStorage}.

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
index 4ddee40..0c6a955 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
@@ -15,18 +15,25 @@ package org.apache.aurora.scheduler.storage;
 
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
 
 /**
  * A distributed snapshot store that supports persisting globally-visible snapshots.
  */
 public interface DistributedSnapshotStore {
+
+  /**
+   * Clean up the underlying storage by optimizing internal data structures. Does not change
+   * externally-visible state but might not run concurrently with write operations.
+   */
+  void snapshot() throws StorageException;
+
   /**
-   * Writes a snapshot to the distributed storage system.
-   * TODO(William Farner): Currently we're hiding some exceptions (which happen to be
-   * RuntimeExceptions).  Clean these up to be checked, and throw another exception type here.
+   * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an
+   * internally-generated one based on the current state.
    *
    * @param snapshot Snapshot to write.
    * @throws CodingException If the snapshot could not be serialized.
    */
-  void persist(Snapshot snapshot) throws CodingException;
+  void snapshotWith(Snapshot snapshot) throws CodingException;
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 7d325b6..c9ea1de 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -196,10 +196,6 @@ public interface Storage {
    * Executes the unit of read-only {@code work}.  The consistency model creates the possibility
    * for a reader to read uncommitted state from a concurrent writer.
    * <p>
-   * TODO(wfarner): Update this documentation once all stores are backed by
-   * {@link org.apache.aurora.scheduler.storage.db.DbStorage}, as the concurrency behavior will then
-   * be dictated by the {@link org.mybatis.guice.transactional.Transactional#isolation()} used.
-   * <p>
    * TODO(wfarner): This method no longer needs to exist now that there is no global locking for
    * reads.  We could instead directly inject the individual stores where they are used, as long
    * as the stores have a layer to replicate what is currently done by
@@ -253,12 +249,6 @@ public interface Storage {
     void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
 
     /**
-     * Clean up the underlying storage by optimizing internal data structures. Does not change
-     * externally-visible state but might not run concurrently with write operations.
-     */
-    void snapshot() throws StorageException;
-
-    /**
      * Prepares the underlying storage system for clean shutdown.
      */
     void stop();

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 6cd5b2b..3a62f02 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -197,7 +197,7 @@ public interface Recovery {
       void commit() {
         primaryStorage.write((NoResult.Quiet) storeProvider -> {
           try {
-            distributedStore.persist(tempStorage.toSnapshot());
+            distributedStore.snapshotWith(tempStorage.toSnapshot());
             shutDownNow.execute();
           } catch (CodingException e) {
             throw new IllegalStateException("Failed to encode snapshot.", e);

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 3000796..18296b0 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -27,9 +27,9 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 
 import static java.util.Objects.requireNonNull;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
new file mode 100644
index 0000000..85b2113
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A storage implementation that ensures storage mutations are written to a persistence layer.
+ *
+ * <p>In the classic write-ahead log usage we'd perform mutations as follows:
+ * <ol>
+ *   <li>record op</li>
+ *   <li>perform op locally</li>
+ *   <li>persist ops</li>
+ * </ol>
+ *
+ * <p>Writing the operation to persistences ensures we have a record of our mutation in case we
+ * should need to recover state later after a crash or on a new host (assuming the scheduler is
+ * distributed).  We then apply the mutation to a local (in-memory) data structure for serving fast
+ * read requests.
+ *
+ * <p>This implementation leverages a local transaction to handle this:
+ * <ol>
+ *   <li>start local transaction</li>
+ *   <li>perform op locally (uncommitted!)</li>
+ *   <li>write op to persistence</li>
+ * </ol>
+ *
+ * <p>If the op fails to apply to local storage we will never persist the op, and if the op
+ * fails to persist, it'll throw and abort the local storage operation as well.
+ */
+public class DurableStorage implements NonVolatileStorage {
+
+  /**
+   * A maintainer for context about open transactions. Assumes that an external entity is
+   * responsible for opening and closing transactions.
+   */
+  interface TransactionManager {
+
+    /**
+     * Checks whether there is an open transaction.
+     *
+     * @return {@code true} if there is an open transaction, {@code false} otherwise.
+     */
+    boolean hasActiveTransaction();
+
+    /**
+     * Adds an operation to the existing transaction.
+     *
+     * @param op Operation to include in the existing transaction.
+     */
+    void log(Op op);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(DurableStorage.class);
+
+  private final Persistence persistence;
+  private final Storage writeBehindStorage;
+  private final SchedulerStore.Mutable writeBehindSchedulerStore;
+  private final CronJobStore.Mutable writeBehindJobStore;
+  private final TaskStore.Mutable writeBehindTaskStore;
+  private final QuotaStore.Mutable writeBehindQuotaStore;
+  private final AttributeStore.Mutable writeBehindAttributeStore;
+  private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
+  private final ReentrantLock writeLock;
+  private final ThriftBackfill thriftBackfill;
+
+  private final WriteAheadStorage writeAheadStorage;
+
+  // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
+  // recovering are controlled at this layer (they're all calls to Mutable store implementations).
+  // The more involved change is changing SnapshotStore to accept a Mutable store provider to
+  // avoid a call to Storage.write() when we replay a Snapshot.
+  private boolean recovered = false;
+  private TransactionRecorder transaction = null;
+
+  private final SlidingStats writerWaitStats = new SlidingStats("storage_write_lock_wait", "ns");
+
+  private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
+
+  @Inject
+  DurableStorage(
+      Persistence persistence,
+      @Volatile Storage delegateStorage,
+      @Volatile SchedulerStore.Mutable schedulerStore,
+      @Volatile CronJobStore.Mutable jobStore,
+      @Volatile TaskStore.Mutable taskStore,
+      @Volatile QuotaStore.Mutable quotaStore,
+      @Volatile AttributeStore.Mutable attributeStore,
+      @Volatile JobUpdateStore.Mutable jobUpdateStore,
+      EventSink eventSink,
+      ReentrantLock writeLock,
+      ThriftBackfill thriftBackfill) {
+
+    this.persistence = requireNonNull(persistence);
+
+    // DurableStorage has two distinct operating modes: pre- and post-recovery.  When recovering,
+    // we write directly to the writeBehind stores since we are replaying what's already persisted.
+    // After that, all writes must succeed in Persistence before they may be considered successful.
+    this.writeBehindStorage = requireNonNull(delegateStorage);
+    this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
+    this.writeBehindJobStore = requireNonNull(jobStore);
+    this.writeBehindTaskStore = requireNonNull(taskStore);
+    this.writeBehindQuotaStore = requireNonNull(quotaStore);
+    this.writeBehindAttributeStore = requireNonNull(attributeStore);
+    this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
+    this.writeLock = requireNonNull(writeLock);
+    this.thriftBackfill = requireNonNull(thriftBackfill);
+    TransactionManager transactionManager = new TransactionManager() {
+      @Override
+      public boolean hasActiveTransaction() {
+        return transaction != null;
+      }
+
+      @Override
+      public void log(Op op) {
+        transaction.add(op);
+      }
+    };
+    this.writeAheadStorage = new WriteAheadStorage(
+        transactionManager,
+        schedulerStore,
+        jobStore,
+        taskStore,
+        quotaStore,
+        attributeStore,
+        jobUpdateStore,
+        LoggerFactory.getLogger(WriteAheadStorage.class),
+        eventSink);
+
+    this.transactionReplayActions = buildTransactionReplayActions();
+  }
+
+  @VisibleForTesting
+  final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
+    return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
+        .put(
+            Op._Fields.SAVE_FRAMEWORK_ID,
+            op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
+        .put(Op._Fields.SAVE_CRON_JOB, op -> {
+          SaveCronJob cronJob = op.getSaveCronJob();
+          writeBehindJobStore.saveAcceptedJob(
+              thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig()));
+        })
+        .put(
+            Op._Fields.REMOVE_JOB,
+            op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
+        .put(
+            Op._Fields.SAVE_TASKS,
+            op -> writeBehindTaskStore.saveTasks(
+                thriftBackfill.backfillTasks(op.getSaveTasks().getTasks())))
+        .put(
+            Op._Fields.REMOVE_TASKS,
+            op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
+        .put(Op._Fields.SAVE_QUOTA, op -> {
+          SaveQuota saveQuota = op.getSaveQuota();
+          writeBehindQuotaStore.saveQuota(
+              saveQuota.getRole(),
+              ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
+        })
+        .put(
+            Op._Fields.REMOVE_QUOTA,
+            op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
+        .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
+          HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
+          // Prior to commit 5cf760b, the store would persist maintenance mode changes for
+          // unknown hosts.  5cf760b began rejecting these, but the storage may still
+          // contain entries with a null slave ID.
+          if (attributes.isSetSlaveId()) {
+            writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
+          } else {
+            LOG.info("Dropping host attributes with no agent ID: " + attributes);
+          }
+        })
+        .put(Op._Fields.SAVE_JOB_UPDATE, op ->
+          writeBehindJobUpdateStore.saveJobUpdate(
+              thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
+        .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
+          SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
+          writeBehindJobUpdateStore.saveJobUpdateEvent(
+              IJobUpdateKey.build(event.getKey()),
+              IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
+        })
+        .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
+          SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
+          writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
+              IJobUpdateKey.build(event.getKey()),
+              IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
+        })
+        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
+          LOG.info("Dropping prune operation.  Updates will be pruned later.");
+        })
+        .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
+          writeBehindJobUpdateStore.removeJobUpdates(
+              IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
+        .build();
+  }
+
+  @Override
+  @Timed("scheduler_storage_prepare")
+  public synchronized void prepare() {
+    writeBehindStorage.prepare();
+    persistence.prepare();
+  }
+
+  @Override
+  @Timed("scheduler_storage_start")
+  public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
+    write((NoResult.Quiet) unused -> {
+      // Must have the underlying storage started so we can query it.
+      // We replay these entries in the forwarded storage system's transactions but not ours - we
+      // do not want to re-record these ops.
+      recover();
+      recovered = true;
+
+      // Now that we're recovered we should persist any mutations done in initializationLogic, so
+      // run it in one of our transactions.
+      write(initializationLogic);
+    });
+  }
+
+  @Override
+  public void stop() {
+    // No-op.
+  }
+
+  @Timed("scheduler_storage_recover")
+  void recover() throws RecoveryFailedException {
+    try {
+      persistence.recover().forEach(DurableStorage.this::replayOp);
+    } catch (PersistenceException e) {
+      throw new RecoveryFailedException(e);
+    }
+  }
+
+  private static final class RecoveryFailedException extends SchedulerException {
+    RecoveryFailedException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  private void replayOp(Op op) {
+    Op._Fields opField = op.getSetField();
+    if (!transactionReplayActions.containsKey(opField)) {
+      throw new IllegalStateException("Unknown transaction op: " + opField);
+    }
+
+    transactionReplayActions.get(opField).accept(op);
+  }
+
+  private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
+      throws StorageException, E {
+
+    // The transaction has already been set up so we just need to delegate with our store provider
+    // so any mutations may be persisted.
+    if (transaction != null) {
+      return work.apply(writeAheadStorage);
+    }
+
+    transaction = new TransactionRecorder();
+    try {
+      return writeBehindStorage.write(unused -> {
+        T result = work.apply(writeAheadStorage);
+        List<Op> ops = transaction.getOps();
+        if (!ops.isEmpty()) {
+          try {
+            persistence.persist(ops.stream());
+          } catch (PersistenceException e) {
+            throw new StorageException("Failed to persist storage changes", e);
+          }
+        }
+        return result;
+      });
+    } finally {
+      transaction = null;
+    }
+  }
+
+  @Override
+  public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
+    long waitStart = System.nanoTime();
+    writeLock.lock();
+    try {
+      writerWaitStats.accumulate(System.nanoTime() - waitStart);
+      // We don't want to persist when recovering, we just want to update the underlying
+      // store - so pass mutations straight through to the underlying storage.
+      if (!recovered) {
+        return writeBehindStorage.write(work);
+      }
+
+      return doInTransaction(work);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
+    return writeBehindStorage.read(work);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
new file mode 100644
index 0000000..9eb862c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.storage.Op;
+
+/**
+ * Persistence layer for storage operations.
+ */
+public interface Persistence {
+
+  /**
+   * Prepares the persistence layer.  The implementation may use this, for example, to advertise as
+   * a replica to cohort schedulers, or begin syncing state for warm standby.
+   */
+  void prepare();
+
+  /**
+   * Recovers previously-persisted records.
+   *
+   * @return All persisted records.
+   * @throws PersistenceException If recovery failed.
+   */
+  Stream<Op> recover() throws PersistenceException;
+
+  /**
+   * Saves new records.  No records may be considered durably saved until this method returns
+   * successfully.
+   *
+   * @param records Records to save.
+   * @throws PersistenceException If the records could not be saved.
+   */
+  void persist(Stream<Op> records) throws PersistenceException;
+
+  /**
+   * Thrown when a persistence operation fails.
+   */
+  class PersistenceException extends Exception {
+    public PersistenceException(String msg) {
+      super(msg);
+    }
+
+    public PersistenceException(Throwable cause) {
+      super(cause);
+    }
+
+    public PersistenceException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
new file mode 100644
index 0000000..4425d02
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.inject.Inject;
+
+import org.apache.aurora.GuavaUtils;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.Resource;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TierInfo;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IResource;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+
+/**
+ * Helps migrating thrift schema by populating deprecated and/or replacement fields.
+ */
+public final class ThriftBackfill {
+
+  private final TierManager tierManager;
+
+  @Inject
+  public ThriftBackfill(TierManager tierManager) {
+    this.tierManager = requireNonNull(tierManager);
+  }
+
+  private static Resource getResource(Set<Resource> resources, ResourceType type) {
+    return resources.stream()
+            .filter(e -> ResourceType.fromResource(IResource.build(e)).equals(type))
+            .findFirst()
+            .orElseThrow(() ->
+                    new IllegalArgumentException("Missing resource definition for " + type));
+  }
+
+  /**
+   * Ensures TaskConfig.resources and correspondent task-level fields are all populated.
+   *
+   * @param config TaskConfig to backfill.
+   * @return Backfilled TaskConfig.
+   */
+  public TaskConfig backfillTask(TaskConfig config) {
+    backfillTier(config);
+    return config;
+  }
+
+  private void backfillTier(TaskConfig config) {
+    ITaskConfig taskConfig = ITaskConfig.build(config);
+    if (config.isSetTier()) {
+      TierInfo tier = tierManager.getTier(taskConfig);
+      config.setProduction(!tier.isPreemptible() && !tier.isRevocable());
+    } else {
+      config.setTier(tierManager.getTiers()
+          .entrySet()
+          .stream()
+          .filter(e -> e.getValue().isPreemptible() == !taskConfig.isProduction()
+              && !e.getValue().isRevocable())
+          .findFirst()
+          .orElseThrow(() -> new IllegalStateException(
+              format("No matching implicit tier for task of job %s", taskConfig.getJob())))
+          .getKey());
+    }
+  }
+
+  /**
+   * Backfills JobConfiguration. See {@link #backfillTask(TaskConfig)}.
+   *
+   * @param jobConfig JobConfiguration to backfill.
+   * @return Backfilled JobConfiguration.
+   */
+  public IJobConfiguration backfillJobConfiguration(JobConfiguration jobConfig) {
+    backfillTask(jobConfig.getTaskConfig());
+    return IJobConfiguration.build(jobConfig);
+  }
+
+  /**
+   * Backfills set of tasks. See {@link #backfillTask(TaskConfig)}.
+   *
+   * @param tasks Set of tasks to backfill.
+   * @return Backfilled set of tasks.
+   */
+  public Set<IScheduledTask> backfillTasks(Set<ScheduledTask> tasks) {
+    return tasks.stream()
+        .map(t -> backfillScheduledTask(t))
+        .map(IScheduledTask::build)
+        .collect(GuavaUtils.toImmutableSet());
+  }
+
+  /**
+   * Ensures ResourceAggregate.resources and correspondent deprecated fields are all populated.
+   *
+   * @param aggregate ResourceAggregate to backfill.
+   * @return Backfilled IResourceAggregate.
+   */
+  public static IResourceAggregate backfillResourceAggregate(ResourceAggregate aggregate) {
+    if (!aggregate.isSetResources() || aggregate.getResources().isEmpty()) {
+      aggregate.addToResources(Resource.numCpus(aggregate.getNumCpus()));
+      aggregate.addToResources(Resource.ramMb(aggregate.getRamMb()));
+      aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb()));
+    } else {
+      EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES;
+      if (aggregate.getResources().size() > quotaResources.size()) {
+        throw new IllegalArgumentException("Too many resource values in quota.");
+      }
+
+      if (!quotaResources.equals(aggregate.getResources().stream()
+              .map(e -> ResourceType.fromResource(IResource.build(e)))
+              .collect(Collectors.toSet()))) {
+
+        throw new IllegalArgumentException("Quota resources must be exactly: " + quotaResources);
+      }
+      aggregate.setNumCpus(
+              getResource(aggregate.getResources(), CPUS).getNumCpus());
+      aggregate.setRamMb(
+              getResource(aggregate.getResources(), RAM_MB).getRamMb());
+      aggregate.setDiskMb(
+              getResource(aggregate.getResources(), DISK_MB).getDiskMb());
+    }
+    return IResourceAggregate.build(aggregate);
+  }
+
+  private ScheduledTask backfillScheduledTask(ScheduledTask task) {
+    backfillTask(task.getAssignedTask().getTask());
+    return task;
+  }
+
+  /**
+   * Backfills JobUpdate. See {@link #backfillTask(TaskConfig)}.
+   *
+   * @param update JobUpdate to backfill.
+   * @return Backfilled job update.
+   */
+  public IJobUpdate backFillJobUpdate(JobUpdate update) {
+    JobUpdateInstructions instructions = update.getInstructions();
+    if (instructions.isSetDesiredState()) {
+      backfillTask(instructions.getDesiredState().getTask());
+    }
+
+    instructions.getInitialState().forEach(e -> backfillTask(e.getTask()));
+
+    return IJobUpdate.build(update);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
new file mode 100644
index 0000000..1c811e3
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+
+/**
+ * Records a sequence of mutations to the storage.
+ */
+class TransactionRecorder {
+  private final List<Op> ops = Lists.newArrayList();
+
+  void add(Op op) {
+    Op prior = Iterables.getLast(ops, null);
+    if (prior == null || !coalesce(prior, op)) {
+      ops.add(op);
+    }
+  }
+
+  List<Op> getOps() {
+    return ops;
+  }
+
+  /**
+   * Tries to coalesce a new op into the prior to compact the binary representation and increase
+   * batching.
+   *
+   * @param prior The previous op.
+   * @param next The next op to be added.
+   * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
+   */
+  private boolean coalesce(Op prior, Op next) {
+    if (!prior.isSet() && !next.isSet()) {
+      return false;
+    }
+
+    Op._Fields priorType = prior.getSetField();
+    if (!priorType.equals(next.getSetField())) {
+      return false;
+    }
+
+    switch (priorType) {
+      case SAVE_FRAMEWORK_ID:
+        prior.setSaveFrameworkId(next.getSaveFrameworkId());
+        return true;
+      case SAVE_TASKS:
+        coalesce(prior.getSaveTasks(), next.getSaveTasks());
+        return true;
+      case REMOVE_TASKS:
+        coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
+        return true;
+      case SAVE_HOST_ATTRIBUTES:
+        return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
+      default:
+        return false;
+    }
+  }
+
+  private void coalesce(SaveTasks prior, SaveTasks next) {
+    if (next.isSetTasks()) {
+      if (prior.isSetTasks()) {
+        // It is an expected invariant that an operation may reference a task (identified by
+        // task ID) no more than one time.  Therefore, to coalesce two SaveTasks operations,
+        // the most recent task definition overrides the prior operation.
+        Map<String, ScheduledTask> coalesced = Maps.newHashMap();
+        for (ScheduledTask task : prior.getTasks()) {
+          coalesced.put(task.getAssignedTask().getTaskId(), task);
+        }
+        for (ScheduledTask task : next.getTasks()) {
+          coalesced.put(task.getAssignedTask().getTaskId(), task);
+        }
+        prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
+      } else {
+        prior.setTasks(next.getTasks());
+      }
+    }
+  }
+
+  private void coalesce(RemoveTasks prior, RemoveTasks next) {
+    if (next.isSetTaskIds()) {
+      if (prior.isSetTaskIds()) {
+        prior.setTaskIds(ImmutableSet.<String>builder()
+            .addAll(prior.getTaskIds())
+            .addAll(next.getTaskIds())
+            .build());
+      } else {
+        prior.setTaskIds(next.getTaskIds());
+      }
+    }
+  }
+
+  private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
+    if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
+      prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
new file mode 100644
index 0000000..667db06
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.slf4j.Logger;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
+ * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
+ * stores.
+ */
+public class WriteAheadStorage implements
+    MutableStoreProvider,
+    SchedulerStore.Mutable,
+    CronJobStore.Mutable,
+    TaskStore.Mutable,
+    QuotaStore.Mutable,
+    AttributeStore.Mutable,
+    JobUpdateStore.Mutable {
+
+  private final TransactionManager transactionManager;
+  private final SchedulerStore.Mutable schedulerStore;
+  private final CronJobStore.Mutable jobStore;
+  private final TaskStore.Mutable taskStore;
+  private final QuotaStore.Mutable quotaStore;
+  private final AttributeStore.Mutable attributeStore;
+  private final JobUpdateStore.Mutable jobUpdateStore;
+  private final Logger log;
+  private final EventSink eventSink;
+
+  /**
+   * Creates a new write-ahead storage that delegates to the providing default stores.
+   *
+   * @param transactionManager External controller for transaction operations.
+   * @param schedulerStore Delegate.
+   * @param jobStore       Delegate.
+   * @param taskStore      Delegate.
+   * @param quotaStore     Delegate.
+   * @param attributeStore Delegate.
+   * @param jobUpdateStore Delegate.
+   */
+  public WriteAheadStorage(
+      TransactionManager transactionManager,
+      SchedulerStore.Mutable schedulerStore,
+      CronJobStore.Mutable jobStore,
+      TaskStore.Mutable taskStore,
+      QuotaStore.Mutable quotaStore,
+      AttributeStore.Mutable attributeStore,
+      JobUpdateStore.Mutable jobUpdateStore,
+      Logger log,
+      EventSink eventSink) {
+
+    this.transactionManager = requireNonNull(transactionManager);
+    this.schedulerStore = requireNonNull(schedulerStore);
+    this.jobStore = requireNonNull(jobStore);
+    this.taskStore = requireNonNull(taskStore);
+    this.quotaStore = requireNonNull(quotaStore);
+    this.attributeStore = requireNonNull(attributeStore);
+    this.jobUpdateStore = requireNonNull(jobUpdateStore);
+    this.log = requireNonNull(log);
+    this.eventSink = requireNonNull(eventSink);
+  }
+
+  private void write(Op op) {
+    Preconditions.checkState(
+        transactionManager.hasActiveTransaction(),
+        "Mutating operations must be within a transaction.");
+    transactionManager.log(op);
+  }
+
+  @Override
+  public void saveFrameworkId(final String frameworkId) {
+    requireNonNull(frameworkId);
+
+    write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+    schedulerStore.saveFrameworkId(frameworkId);
+  }
+
+  @Override
+  public void deleteTasks(final Set<String> taskIds) {
+    requireNonNull(taskIds);
+
+    write(Op.removeTasks(new RemoveTasks(taskIds)));
+    taskStore.deleteTasks(taskIds);
+  }
+
+  @Override
+  public void saveTasks(final Set<IScheduledTask> newTasks) {
+    requireNonNull(newTasks);
+
+    write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+    taskStore.saveTasks(newTasks);
+  }
+
+  @Override
+  public Optional<IScheduledTask> mutateTask(
+      String taskId,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
+    log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
+    write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+    return mutated;
+  }
+
+  @Override
+  public void saveQuota(final String role, final IResourceAggregate quota) {
+    requireNonNull(role);
+    requireNonNull(quota);
+
+    write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+    quotaStore.saveQuota(role, quota);
+  }
+
+  @Override
+  public boolean saveHostAttributes(final IHostAttributes attrs) {
+    requireNonNull(attrs);
+
+    boolean changed = attributeStore.saveHostAttributes(attrs);
+    if (changed) {
+      write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
+      eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
+    }
+    return changed;
+  }
+
+  @Override
+  public void removeJob(final IJobKey jobKey) {
+    requireNonNull(jobKey);
+
+    write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+    jobStore.removeJob(jobKey);
+  }
+
+  @Override
+  public void saveAcceptedJob(final IJobConfiguration jobConfig) {
+    requireNonNull(jobConfig);
+
+    write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+    jobStore.saveAcceptedJob(jobConfig);
+  }
+
+  @Override
+  public void removeQuota(final String role) {
+    requireNonNull(role);
+
+    write(Op.removeQuota(new RemoveQuota(role)));
+    quotaStore.removeQuota(role);
+  }
+
+  @Override
+  public void saveJobUpdate(IJobUpdate update) {
+    requireNonNull(update);
+
+    write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+    jobUpdateStore.saveJobUpdate(update);
+  }
+
+  @Override
+  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+    requireNonNull(key);
+    requireNonNull(event);
+
+    write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
+    jobUpdateStore.saveJobUpdateEvent(key, event);
+  }
+
+  @Override
+  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
+    requireNonNull(key);
+    requireNonNull(event);
+
+    write(Op.saveJobInstanceUpdateEvent(
+        new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
+    jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
+  }
+
+  @Override
+  public void removeJobUpdates(Set<IJobUpdateKey> keys) {
+    requireNonNull(keys);
+
+    // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
+    // read it.  JobUpdates are only removed implicitly when a snapshot is taken.
+    jobUpdateStore.removeJobUpdates(keys);
+  }
+
+  @Override
+  public void deleteAllTasks() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteHostAttributes() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteJobs() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteQuotas() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteAllUpdates() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public SchedulerStore.Mutable getSchedulerStore() {
+    return this;
+  }
+
+  @Override
+  public CronJobStore.Mutable getCronJobStore() {
+    return this;
+  }
+
+  @Override
+  public TaskStore.Mutable getUnsafeTaskStore() {
+    return this;
+  }
+
+  @Override
+  public QuotaStore.Mutable getQuotaStore() {
+    return this;
+  }
+
+  @Override
+  public AttributeStore.Mutable getAttributeStore() {
+    return this;
+  }
+
+  @Override
+  public TaskStore getTaskStore() {
+    return this;
+  }
+
+  @Override
+  public JobUpdateStore.Mutable getJobUpdateStore() {
+    return this;
+  }
+
+  @Override
+  public Optional<String> fetchFrameworkId() {
+    return this.schedulerStore.fetchFrameworkId();
+  }
+
+  @Override
+  public Iterable<IJobConfiguration> fetchJobs() {
+    return this.jobStore.fetchJobs();
+  }
+
+  @Override
+  public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
+    return this.jobStore.fetchJob(jobKey);
+  }
+
+  @Override
+  public Optional<IScheduledTask> fetchTask(String taskId) {
+    return this.taskStore.fetchTask(taskId);
+  }
+
+  @Override
+  public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+    return this.taskStore.fetchTasks(query);
+  }
+
+  @Override
+  public Set<IJobKey> getJobKeys() {
+    return this.taskStore.getJobKeys();
+  }
+
+  @Override
+  public Optional<IResourceAggregate> fetchQuota(String role) {
+    return this.quotaStore.fetchQuota(role);
+  }
+
+  @Override
+  public Map<String, IResourceAggregate> fetchQuotas() {
+    return this.quotaStore.fetchQuotas();
+  }
+
+  @Override
+  public Optional<IHostAttributes> getHostAttributes(String host) {
+    return this.attributeStore.getHostAttributes(host);
+  }
+
+  @Override
+  public Set<IHostAttributes> getHostAttributes() {
+    return this.attributeStore.getHostAttributes();
+  }
+
+  @Override
+  public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
+    return this.jobUpdateStore.fetchJobUpdates(query);
+  }
+
+  @Override
+  public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
+    return this.jobUpdateStore.fetchJobUpdate(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
new file mode 100644
index 0000000..a0a6b6c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.common.application.ShutdownRegistry;
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Persistence layer that uses a replicated log.
+ */
+class LogPersistence implements Persistence, DistributedSnapshotStore {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LogPersistence.class);
+
+  private final LogManager logManager;
+  private final SnapshotStore<Snapshot> snapshotStore;
+  private final SchedulingService schedulingService;
+  private final Amount<Long, Time> snapshotInterval;
+  private StreamManager streamManager;
+
+  @Inject
+  LogPersistence(
+      Settings settings,
+      LogManager logManager,
+      SnapshotStore<Snapshot> snapshotStore,
+      ShutdownRegistry shutdownRegistry) {
+
+    this(new ScheduledExecutorSchedulingService(
+            shutdownRegistry,
+            settings.getShutdownGracePeriod()),
+        settings.getSnapshotInterval(),
+        logManager,
+        snapshotStore);
+  }
+
+  @VisibleForTesting
+  LogPersistence(
+      SchedulingService schedulingService,
+      Amount<Long, Time> snapshotInterval,
+      LogManager logManager,
+      SnapshotStore<Snapshot> snapshotStore) {
+
+    this.schedulingService = requireNonNull(schedulingService);
+    this.snapshotInterval = requireNonNull(snapshotInterval);
+    this.logManager = requireNonNull(logManager);
+    this.snapshotStore = requireNonNull(snapshotStore);
+  }
+
+  @Override
+  public void prepare() {
+    // Open the log to make a log replica available to the scheduler group.
+    try {
+      streamManager = logManager.open();
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to open the log, cannot continue", e);
+    }
+  }
+
+  @Override
+  public void persist(Stream<Op> mutations) throws PersistenceException {
+    StreamTransaction transaction = streamManager.startTransaction();
+    mutations.forEach(transaction::add);
+    try {
+      transaction.commit();
+    } catch (CodingException e) {
+      throw new PersistenceException(e);
+    }
+  }
+
+  @Override
+  public Stream<Op> recover() throws PersistenceException {
+    scheduleSnapshots();
+
+    try {
+      Iterator<LogEntry> entries = streamManager.readFromBeginning();
+      Iterable<LogEntry> iterableEntries = () -> entries;
+      Stream<LogEntry> entryStream = StreamSupport.stream(iterableEntries.spliterator(), false);
+
+      return entryStream
+          .filter(entry -> entry.getSetField() != LogEntry._Fields.NOOP)
+          .filter(entry -> {
+            if (entry.getSetField() == LogEntry._Fields.SNAPSHOT) {
+              Snapshot snapshot = entry.getSnapshot();
+              LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
+              snapshotStore.applySnapshot(snapshot);
+              return false;
+            }
+            return true;
+          })
+          .peek(entry -> {
+            if (entry.getSetField() != LogEntry._Fields.TRANSACTION) {
+              throw new IllegalStateException("Unknown log entry type: " + entry.getSetField());
+            }
+          })
+          .flatMap(entry -> entry.getTransaction().getOps().stream());
+    } catch (CodingException | InvalidPositionException | StreamAccessException e) {
+      throw new PersistenceException(e);
+    }
+  }
+
+  private void scheduleSnapshots() {
+    if (snapshotInterval.getValue() > 0) {
+      schedulingService.doEvery(snapshotInterval, () -> {
+        try {
+          snapshot();
+        } catch (StorageException e) {
+          if (e.getCause() == null) {
+            LOG.warn("StorageException when attempting to snapshot.", e);
+          } else {
+            LOG.warn(e.getMessage(), e.getCause());
+          }
+        }
+      });
+    }
+  }
+
+  @Override
+  public void snapshot() throws StorageException {
+    try {
+      doSnapshot();
+    } catch (CodingException e) {
+      throw new StorageException("Failed to encode a snapshot", e);
+    } catch (InvalidPositionException e) {
+      throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
+    } catch (StreamAccessException e) {
+      throw new StorageException("Failed to create a snapshot", e);
+    }
+  }
+
+  @Timed("scheduler_log_snapshot_persist")
+  @Override
+  public void snapshotWith(Snapshot snapshot)
+      throws CodingException, InvalidPositionException, StreamAccessException {
+
+    streamManager.snapshot(snapshot);
+  }
+
+  /**
+   * Forces a snapshot of the storage state.
+   *
+   * @throws CodingException If there is a problem encoding the snapshot.
+   * @throws InvalidPositionException If the log stream cursor is invalid.
+   * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
+   */
+  @Timed("scheduler_log_snapshot")
+  void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
+    LOG.info("Creating snapshot.");
+    Snapshot snapshot = snapshotStore.createSnapshot();
+    snapshotWith(snapshot);
+    LOG.info("Snapshot complete."
+        + " host attrs: " + snapshot.getHostAttributesSize()
+        + ", cron jobs: " + snapshot.getCronJobsSize()
+        + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+        + ", tasks: " + snapshot.getTasksSize()
+        + ", updates: " + snapshot.getJobUpdateDetailsSize());
+  }
+
+  /**
+   * A service that can schedule an action to be executed periodically.
+   */
+  @VisibleForTesting
+  interface SchedulingService {
+
+    /**
+     * Schedules an action to execute periodically.
+     *
+     * @param interval The time period to wait until running the {@code action} again.
+     * @param action The action to execute periodically.
+     */
+    void doEvery(Amount<Long, Time> interval, Runnable action);
+  }
+
+  private static class ScheduledExecutorSchedulingService implements SchedulingService {
+    private final ScheduledExecutorService scheduledExecutor;
+
+    ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
+                                       Amount<Long, Time> shutdownGracePeriod) {
+      scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
+      shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination(
+          scheduledExecutor,
+          shutdownGracePeriod.getValue(),
+          shutdownGracePeriod.getUnit().getTimeUnit()));
+    }
+
+    @Override
+    public void doEvery(Amount<Long, Time> interval, Runnable action) {
+      requireNonNull(interval);
+      requireNonNull(action);
+
+      long delay = interval.getValue();
+      TimeUnit timeUnit = interval.getUnit().getTimeUnit();
+      scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
+    }
+  }
+
+  /**
+   * Configuration settings for log persistence.
+   */
+  public static class Settings {
+    private final Amount<Long, Time> shutdownGracePeriod;
+    private final Amount<Long, Time> snapshotInterval;
+
+    Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
+      this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
+      this.snapshotInterval = requireNonNull(snapshotInterval);
+    }
+
+    public Amount<Long, Time> getShutdownGracePeriod() {
+      return shutdownGracePeriod;
+    }
+
+    public Amount<Long, Time> getSnapshotInterval() {
+      return snapshotInterval;
+    }
+  }
+}


[2/4] aurora git commit: Extract a storage Persistence layer

Posted by wf...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
new file mode 100644
index 0000000..07912b6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
@@ -0,0 +1,781 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveJobUpdates;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.resources.ResourceTestUtil;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.diskMb;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeConfig;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DurableStorageTest extends EasyMockTest {
+
+  private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "name");
+  private static final IJobUpdateKey UPDATE_ID =
+      IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "testUpdateId"));
+
+  private DurableStorage durableStorage;
+  private Persistence persistence;
+  private StorageTestUtil storageUtil;
+  private EventSink eventSink;
+
+  @Before
+  public void setUp() {
+    persistence = createMock(Persistence.class);
+    storageUtil = new StorageTestUtil(this);
+    eventSink = createMock(EventSink.class);
+
+    durableStorage = new DurableStorage(
+        persistence,
+        storageUtil.storage,
+        storageUtil.schedulerStore,
+        storageUtil.jobStore,
+        storageUtil.taskStore,
+        storageUtil.quotaStore,
+        storageUtil.attributeStore,
+        storageUtil.jobUpdateStore,
+        eventSink,
+        new ReentrantLock(),
+        TaskTestUtil.THRIFT_BACKFILL);
+
+    storageUtil.storage.prepare();
+  }
+
+  @Test
+  public void testStart() throws Exception {
+    // We should initialize persistence.
+    persistence.prepare();
+
+    // Our start should recover persistence and then forward to the underlying storage start of the
+    // supplied initialization logic.
+    AtomicBoolean initialized = new AtomicBoolean(false);
+    MutateWork.NoResult.Quiet initializationLogic = provider -> {
+      // Creating a mock and expecting apply(storeProvider) does not work here for whatever
+      // reason.
+      initialized.set(true);
+    };
+
+    Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
+    storageUtil.storage.write(capture(recoverAndInitializeWork));
+    expectLastCall().andAnswer(() -> {
+      recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
+      return null;
+    });
+
+    Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture();
+    expect(storageUtil.storage.write(capture(initializationWork))).andAnswer(
+        () -> {
+          initializationWork.getValue().apply(storageUtil.mutableStoreProvider);
+          return null;
+        });
+
+    // Populate all Op types.
+    buildReplayOps();
+
+    control.replay();
+
+    durableStorage.prepare();
+    durableStorage.start(initializationLogic);
+    assertTrue(initialized.get());
+
+    // Assert all Transaction types have handlers defined.
+    assertEquals(
+        EnumSet.allOf(Op._Fields.class),
+        EnumSet.copyOf(durableStorage.buildTransactionReplayActions().keySet()));
+  }
+
+  private void buildReplayOps() throws Exception {
+    ImmutableSet.Builder<Op> builder = ImmutableSet.builder();
+
+    builder.add(Op.saveFrameworkId(new SaveFrameworkId("bob")));
+    storageUtil.schedulerStore.saveFrameworkId("bob");
+
+    JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig());
+    JobConfiguration expectedJob =
+        new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder());
+    SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob);
+    builder.add(Op.saveCronJob(cronJob));
+    storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob));
+
+    RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
+    builder.add(Op.removeJob(removeJob));
+    storageUtil.jobStore.removeJob(JOB_KEY);
+
+    ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder();
+    actualTask.getAssignedTask().setTask(nonBackfilledConfig());
+    IScheduledTask expectedTask = makeTask("id", JOB_KEY);
+    SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask));
+    builder.add(Op.saveTasks(saveTasks));
+    storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask));
+
+    RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
+    builder.add(Op.removeTasks(removeTasks));
+    storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
+
+    ResourceAggregate nonBackfilled = new ResourceAggregate()
+        .setNumCpus(1.0)
+        .setRamMb(32)
+        .setDiskMb(64);
+    SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled);
+    builder.add(Op.saveQuota(saveQuota));
+    storageUtil.quotaStore.saveQuota(
+        saveQuota.getRole(),
+        IResourceAggregate.build(nonBackfilled.deepCopy()
+            .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))));
+
+    builder.add(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole())));
+    storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
+
+    // This entry lacks a slave ID, and should therefore be discarded.
+    SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
+        .setHost("host1")
+        .setMode(MaintenanceMode.DRAINED));
+    builder.add(Op.saveHostAttributes(hostAttributes1));
+
+    SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
+        .setHost("host2")
+        .setSlaveId("slave2")
+        .setMode(MaintenanceMode.DRAINED));
+    builder.add(Op.saveHostAttributes(hostAttributes2));
+    expect(storageUtil.attributeStore.saveHostAttributes(
+        IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true);
+
+    JobUpdate actualUpdate = new JobUpdate()
+        .setSummary(new JobUpdateSummary().setKey(UPDATE_ID.newBuilder()))
+        .setInstructions(new JobUpdateInstructions()
+            .setInitialState(
+                ImmutableSet.of(new InstanceTaskConfig().setTask(nonBackfilledConfig())))
+            .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig())));
+    JobUpdate expectedUpdate = actualUpdate.deepCopy();
+    expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder());
+    expectedUpdate.getInstructions().getInitialState()
+        .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
+    SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
+    builder.add(Op.saveJobUpdate(saveUpdate));
+    storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
+
+    SaveJobUpdateEvent saveUpdateEvent =
+        new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder());
+    builder.add(Op.saveJobUpdateEvent(saveUpdateEvent));
+    storageUtil.jobUpdateStore.saveJobUpdateEvent(
+        UPDATE_ID,
+        IJobUpdateEvent.build(saveUpdateEvent.getEvent()));
+
+    SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent(
+        new JobInstanceUpdateEvent(),
+        UPDATE_ID.newBuilder());
+    builder.add(Op.saveJobInstanceUpdateEvent(saveInstanceEvent));
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+        UPDATE_ID,
+        IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()));
+
+    builder.add(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L)));
+    // No expectation - this op is ignored.
+
+    builder.add(Op.removeJobUpdate(
+        new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder()))));
+    storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID));
+
+    expect(persistence.recover()).andReturn(builder.build().stream());
+  }
+
+  private TaskConfig nonBackfilledConfig() {
+    // When more fields have to be backfilled
+    // modify this method.
+    return makeConfig(JOB_KEY).newBuilder();
+  }
+
+  abstract class AbstractStorageFixture {
+    private final AtomicBoolean runCalled = new AtomicBoolean(false);
+
+    AbstractStorageFixture() {
+      // Prevent otherwise silent noop tests that forget to call run().
+      addTearDown(new TearDown() {
+        @Override
+        public void tearDown() {
+          assertTrue(runCalled.get());
+        }
+      });
+    }
+
+    void run() throws Exception {
+      runCalled.set(true);
+
+      // Expect basic start operations.
+
+      // Initialize persistence.
+      persistence.prepare();
+
+      // Replay the ops and perform and supplied initializationWork.
+      // Simulate NOOP initialization work
+      // Creating a mock and expecting apply(storeProvider) does not work here for whatever
+      // reason.
+      MutateWork.NoResult.Quiet initializationLogic = storeProvider -> {
+        // No-op.
+      };
+
+      Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
+      storageUtil.storage.write(capture(recoverAndInitializeWork));
+      expectLastCall().andAnswer(() -> {
+        recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
+        return null;
+      });
+
+      expect(persistence.recover()).andReturn(Stream.empty());
+      Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
+      expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
+          () -> {
+            recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
+            return null;
+          });
+
+      // Setup custom test expectations.
+      setupExpectations();
+
+      control.replay();
+
+      // Start the system.
+      durableStorage.prepare();
+      durableStorage.start(initializationLogic);
+
+      // Exercise the system.
+      runTest();
+    }
+
+    protected void setupExpectations() throws Exception {
+      // Default to no expectations.
+    }
+
+    protected abstract void runTest();
+  }
+
+  abstract class AbstractMutationFixture extends AbstractStorageFixture {
+    @Override
+    protected void runTest() {
+      durableStorage.write((Quiet) AbstractMutationFixture.this::performMutations);
+    }
+
+    protected abstract void performMutations(MutableStoreProvider storeProvider);
+  }
+
+  private void expectPersist(Op op, Op... ops) {
+    try {
+      // Workaround for comparing streams.
+      persistence.persist(anyObject());
+      expectLastCall().andAnswer((IAnswer<Void>) () -> {
+        assertEquals(
+            ImmutableList.<Op>builder().add(op).add(ops).build(),
+            ((Stream<Op>) EasyMock.getCurrentArguments()[0]).collect(Collectors.toList()));
+
+        return null;
+      });
+    } catch (Persistence.PersistenceException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testSaveFrameworkId() throws Exception {
+    String frameworkId = "bob";
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.schedulerStore.saveFrameworkId(frameworkId);
+        expectPersist(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getSchedulerStore().saveFrameworkId(frameworkId);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveAcceptedJob() throws Exception {
+    IJobConfiguration jobConfig =
+        IJobConfiguration.build(new JobConfiguration().setKey(JOB_KEY.newBuilder()));
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobStore.saveAcceptedJob(jobConfig);
+        expectPersist(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getCronJobStore().saveAcceptedJob(jobConfig);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveJob() throws Exception {
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobStore.removeJob(JOB_KEY);
+        expectPersist(Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getCronJobStore().removeJob(JOB_KEY);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveTasks() throws Exception {
+    Set<IScheduledTask> tasks = ImmutableSet.of(task("a", ScheduleStatus.INIT));
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.saveTasks(tasks);
+        expectPersist(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(tasks);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testMutateTasks() throws Exception {
+    String taskId = "fred";
+    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+        expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testNestedTransactions() throws Exception {
+    String taskId = "fred";
+    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
+    ImmutableSet<String> tasksToRemove = ImmutableSet.of("b");
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+        storageUtil.taskStore.deleteTasks(tasksToRemove);
+
+        expectPersist(
+            Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))),
+            Op.removeTasks(new RemoveTasks(tasksToRemove)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+
+        durableStorage.write((NoResult.Quiet)
+            innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveAndMutateTasks() throws Exception {
+    String taskId = "fred";
+    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+    Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT));
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.saveTasks(saved);
+
+        // Nested transaction with result.
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+        // Resulting stream operation.
+        expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(saved);
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception {
+    String taskId = "fred";
+    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+    Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT));
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.saveTasks(saved);
+
+        // Nested transaction with result.
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+        // Resulting stream operation.
+        expectPersist(Op.saveTasks(new SaveTasks(
+                ImmutableSet.<ScheduledTask>builder()
+                    .addAll(IScheduledTask.toBuildersList(saved))
+                    .add(mutated.get().newBuilder())
+                    .build())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(saved);
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveTasksQuery() throws Exception {
+    IScheduledTask task = task("a", ScheduleStatus.FINISHED);
+    Set<String> taskIds = Tasks.ids(task);
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.deleteTasks(taskIds);
+        expectPersist(Op.removeTasks(new RemoveTasks(taskIds)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveTasksIds() throws Exception {
+    Set<String> taskIds = ImmutableSet.of("42");
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.deleteTasks(taskIds);
+        expectPersist(Op.removeTasks(new RemoveTasks(taskIds)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveQuota() throws Exception {
+    String role = "role";
+    IResourceAggregate quota = ResourceTestUtil.aggregate(1.0, 128L, 1024L);
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.quotaStore.saveQuota(role, quota);
+        expectPersist(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getQuotaStore().saveQuota(role, quota);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveQuota() throws Exception {
+    String role = "role";
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.quotaStore.removeQuota(role);
+        expectPersist(Op.removeQuota(new RemoveQuota(role)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getQuotaStore().removeQuota(role);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveHostAttributes() throws Exception {
+    String host = "hostname";
+    Set<Attribute> attributes =
+        ImmutableSet.of(new Attribute().setName("attr").setValues(ImmutableSet.of("value")));
+    Optional<IHostAttributes> hostAttributes = Optional.of(
+        IHostAttributes.build(new HostAttributes()
+            .setHost(host)
+            .setAttributes(attributes)));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        expect(storageUtil.attributeStore.getHostAttributes(host))
+            .andReturn(Optional.absent());
+
+        expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
+
+        expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true);
+        eventSink.post(new PubsubEvent.HostAttributesChanged(hostAttributes.get()));
+        expectPersist(
+            Op.saveHostAttributes(new SaveHostAttributes(hostAttributes.get().newBuilder())));
+
+        expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get()))
+            .andReturn(false);
+
+        expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        AttributeStore.Mutable store = storeProvider.getAttributeStore();
+        assertEquals(Optional.absent(), store.getHostAttributes(host));
+
+        assertTrue(store.saveHostAttributes(hostAttributes.get()));
+
+        assertEquals(hostAttributes, store.getHostAttributes(host));
+
+        assertFalse(store.saveHostAttributes(hostAttributes.get()));
+
+        assertEquals(hostAttributes, store.getHostAttributes(host));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveUpdate() throws Exception {
+    IJobUpdate update = IJobUpdate.build(new JobUpdate()
+        .setSummary(new JobUpdateSummary()
+            .setKey(UPDATE_ID.newBuilder())
+            .setUser("user"))
+        .setInstructions(new JobUpdateInstructions()
+            .setDesiredState(new InstanceTaskConfig()
+                .setTask(new TaskConfig())
+                .setInstances(ImmutableSet.of(new Range(0, 3))))
+            .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
+                .setTask(new TaskConfig())
+                .setInstances(ImmutableSet.of(new Range(0, 3)))))
+            .setSettings(new JobUpdateSettings())));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobUpdateStore.saveJobUpdate(update);
+        expectPersist(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().saveJobUpdate(update);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveJobUpdateEvent() throws Exception {
+    IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent()
+        .setStatus(JobUpdateStatus.ROLLING_BACK)
+        .setTimestampMs(12345L));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobUpdateStore.saveJobUpdateEvent(UPDATE_ID, event);
+        expectPersist(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(
+            event.newBuilder(),
+            UPDATE_ID.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().saveJobUpdateEvent(UPDATE_ID, event);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveJobInstanceUpdateEvent() throws Exception {
+    IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(new JobInstanceUpdateEvent()
+        .setAction(JobUpdateAction.INSTANCE_ROLLING_BACK)
+        .setTimestampMs(12345L)
+        .setInstanceId(0));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(UPDATE_ID, event);
+        expectPersist(Op.saveJobInstanceUpdateEvent(
+            new SaveJobInstanceUpdateEvent(
+                event.newBuilder(),
+                UPDATE_ID.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(UPDATE_ID, event);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveJobUpdates() throws Exception {
+    IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey()
+        .setJob(JOB_KEY.newBuilder())
+        .setId("update-id"));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(key));
+
+        // No transaction is generated since this version is currently in 'read-only'
+        // compatibility mode for this operation type.
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key));
+      }
+    }.run();
+  }
+
+  private static IScheduledTask task(String id, ScheduleStatus status) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setAssignedTask(new AssignedTask()
+            .setTaskId(id)
+            .setTask(new TaskConfig())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
new file mode 100644
index 0000000..219576b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.diskMb;
+import static org.apache.aurora.gen.Resource.namedPort;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class ThriftBackfillTest extends EasyMockTest {
+
+  private ThriftBackfill thriftBackfill;
+  private TierManager tierManager;
+
+  @Before
+  public void setUp() {
+    tierManager = createMock(TierManager.class);
+    thriftBackfill = new ThriftBackfill(tierManager);
+  }
+
+  @Test
+  public void testFieldsToSetNoPorts() {
+    TaskConfig config = new TaskConfig()
+        .setResources(ImmutableSet.of(
+            numCpus(1.0),
+            ramMb(32),
+            diskMb(64)))
+        .setProduction(false)
+        .setTier("tierName");
+    TaskConfig expected = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.DEV_TIER);
+
+    control.replay();
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test
+  public void testResourceAggregateFieldsToSet() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setNumCpus(1.0)
+        .setRamMb(32)
+        .setDiskMb(64);
+
+    IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))));
+
+    assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
+  }
+
+  @Test
+  public void testResourceAggregateSetToFields() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
+        .setNumCpus(1.0)
+        .setRamMb(32)
+        .setDiskMb(64));
+
+    assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testResourceAggregateTooManyResources() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64), numCpus(2.0)));
+    ThriftBackfill.backfillResourceAggregate(aggregate);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testResourceAggregateInvalidResources() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), namedPort("http")));
+    ThriftBackfill.backfillResourceAggregate(aggregate);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testResourceAggregateMissingResources() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32)));
+    ThriftBackfill.backfillResourceAggregate(aggregate);
+  }
+
+  @Test
+  public void testBackfillTierProduction() {
+    TaskConfig config = new TaskConfig()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
+        .setProduction(true)
+        .setTier("tierName");
+    TaskConfig expected = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.PREFERRED_TIER);
+
+    control.replay();
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test
+  public void testBackfillTierNotProduction() {
+    TaskConfig config = new TaskConfig()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
+        .setProduction(true)
+        .setTier("tierName");
+    TaskConfig configWithBackfilledResources = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTier(ITaskConfig.build(configWithBackfilledResources)))
+        .andReturn(TaskTestUtil.DEV_TIER);
+
+    control.replay();
+
+    TaskConfig expected = configWithBackfilledResources.deepCopy()
+        .setProduction(false);
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test
+  public void testBackfillTierSetsTierToPreemptible() {
+    TaskConfig config = new TaskConfig()
+            .setResources(ImmutableSet.of(
+                    numCpus(1.0),
+                    ramMb(32),
+                    diskMb(64)));
+    TaskConfig configWithBackfilledResources = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
+
+    control.replay();
+
+    TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preemptible");
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test
+  public void testBackfillTierSetsTierToPreferred() {
+    TaskConfig config = new TaskConfig()
+        .setResources(ImmutableSet.of(
+            numCpus(1.0),
+            ramMb(32),
+            diskMb(64)))
+        .setProduction(true);
+    TaskConfig configWithBackfilledResources = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
+
+    control.replay();
+
+    TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preferred");
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBackfillTierBadTierConfiguration() {
+    TaskConfig config = new TaskConfig()
+            .setResources(ImmutableSet.of(
+                    numCpus(1.0),
+                    ramMb(32),
+                    diskMb(64)));
+
+    expect(tierManager.getTiers()).andReturn(ImmutableMap.of());
+
+    control.replay();
+
+    thriftBackfill.backfillTask(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
new file mode 100644
index 0000000..cbad3eb
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TransactionRecorderTest {
+  @Test
+  public void testCoalesce() throws Exception {
+    // No coalescing - different operation types.
+    assertEquals(
+        ImmutableList.of(
+            Op.removeTasks(createRemoveTasks("1", "2")),
+            Op.saveTasks(createSaveTasks("4", "5"))),
+        record(
+            Op.removeTasks(createRemoveTasks("1", "2")),
+            Op.saveTasks(createSaveTasks("4", "5"))));
+
+    assertEquals(
+        ImmutableList.of(Op.removeTasks(createRemoveTasks("1", "2", "3", "4"))),
+        record(
+            Op.removeTasks(createRemoveTasks("1", "2")),
+            Op.removeTasks(createRemoveTasks("3", "4"))));
+
+    assertEquals(
+        ImmutableList.of(Op.saveTasks(createSaveTasks("3", "2", "1"))),
+        record(Op.saveTasks(createSaveTasks("1", "2")), Op.saveTasks(createSaveTasks("1", "3"))));
+
+    assertEquals(
+        ImmutableList.of(Op.removeTasks(createRemoveTasks("3", "4", "5"))),
+        record(
+            Op.removeTasks(createRemoveTasks("3")),
+            Op.removeTasks(createRemoveTasks("4", "5"))));
+  }
+
+  private static List<Op> record(Op... ops) {
+    TransactionRecorder recorder = new TransactionRecorder();
+    Stream.of(ops).forEach(recorder::add);
+    return recorder.getOps();
+  }
+
+  private static SaveTasks createSaveTasks(String... taskIds) {
+    return new SaveTasks().setTasks(
+        Stream.of(taskIds)
+            .map(id -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(id)))
+            .collect(Collectors.toSet())
+    );
+  }
+
+  private RemoveTasks createRemoveTasks(String... taskIds) {
+    return new RemoveTasks(ImmutableSet.copyOf(taskIds));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
new file mode 100644
index 0000000..e8b564b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class WriteAheadStorageTest extends EasyMockTest {
+
+  private TransactionManager transactionManager;
+  private TaskStore.Mutable taskStore;
+  private AttributeStore.Mutable attributeStore;
+  private JobUpdateStore.Mutable jobUpdateStore;
+  private EventSink eventSink;
+  private WriteAheadStorage storage;
+
+  @Before
+  public void setUp() {
+    transactionManager = createMock(TransactionManager.class);
+    taskStore = createMock(TaskStore.Mutable.class);
+    attributeStore = createMock(AttributeStore.Mutable.class);
+    jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
+    eventSink = createMock(EventSink.class);
+
+    storage = new WriteAheadStorage(
+        transactionManager,
+        createMock(SchedulerStore.Mutable.class),
+        createMock(CronJobStore.Mutable.class),
+        taskStore,
+        createMock(QuotaStore.Mutable.class),
+        attributeStore,
+        jobUpdateStore,
+        LoggerFactory.getLogger(WriteAheadStorageTest.class),
+        eventSink);
+  }
+
+  private void expectOp(Op op) {
+    expect(transactionManager.hasActiveTransaction()).andReturn(true);
+    transactionManager.log(op);
+  }
+
+  @Test
+  public void testRemoveUpdates() {
+    Set<IJobUpdateKey> removed = ImmutableSet.of(
+        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
+        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
+    jobUpdateStore.removeJobUpdates(removed);
+    // No operation is written since this Op is in read-only compatibility mode.
+
+    control.replay();
+
+    storage.removeJobUpdates(removed);
+  }
+
+  @Test
+  public void testMutate() {
+    String taskId = "a";
+    Function<IScheduledTask, IScheduledTask> mutator =
+        createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
+    Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
+
+    expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
+    expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+    control.replay();
+
+    assertEquals(mutated, storage.mutateTask(taskId, mutator));
+  }
+
+  @Test
+  public void testSaveHostAttributes() {
+    IHostAttributes attributes = IHostAttributes.build(
+        new HostAttributes()
+            .setHost("a")
+            .setMode(MaintenanceMode.DRAINING)
+            .setAttributes(ImmutableSet.of(
+                new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
+
+    expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
+    expectOp(Op.saveHostAttributes(
+        new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
+    eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
+
+    expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
+
+    control.replay();
+
+    assertTrue(storage.saveHostAttributes(attributes));
+
+    assertFalse(storage.saveHostAttributes(attributes));
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteAllTasks() {
+    control.replay();
+    storage.deleteAllTasks();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteHostAttributes() {
+    control.replay();
+    storage.deleteHostAttributes();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteJobs() {
+    control.replay();
+    storage.deleteJobs();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteQuotas() {
+    control.replay();
+    storage.deleteQuotas();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteAllUpdatesAndEvents() {
+    control.replay();
+    storage.deleteAllUpdates();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
index 3f44559..cb38f10 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
@@ -21,7 +21,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.function.Consumer;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
@@ -37,10 +36,8 @@ import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.storage.DeduplicatedSnapshot;
 import org.apache.aurora.gen.storage.Frame;
 import org.apache.aurora.gen.storage.FrameChunk;
@@ -48,9 +45,7 @@ import org.apache.aurora.gen.storage.FrameHeader;
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Op;
 import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveTasks;
 import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveTasks;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
@@ -112,11 +107,11 @@ public class LogManagerTest extends EasyMockTest {
   public void testStreamManagerReadFromUnknownNone() throws CodingException {
     expect(stream.readAll()).andReturn(Collections.emptyIterator());
 
-    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-
     control.replay();
 
-    createNoMessagesStreamManager().readFromBeginning(reader);
+    assertEquals(
+        ImmutableList.of(),
+        ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning()));
   }
 
   @Test
@@ -127,12 +122,11 @@ public class LogManagerTest extends EasyMockTest {
     expect(entry1.contents()).andReturn(encode(transaction1));
     expect(stream.readAll()).andReturn(Iterators.singletonIterator(entry1));
 
-    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-    reader.accept(transaction1);
-
     control.replay();
 
-    createNoMessagesStreamManager().readFromBeginning(reader);
+    assertEquals(
+        ImmutableList.of(transaction1),
+        ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning()));
   }
 
   @Test
@@ -214,50 +208,6 @@ public class LogManagerTest extends EasyMockTest {
   }
 
   @Test
-  public void testCoalesce() throws CodingException {
-    SaveTasks saveTasks1 = createSaveTasks("1", "2");
-    createSaveTasks("2");
-    SaveTasks saveTasks2 = createSaveTasks("1", "3");
-    SaveTasks saveTasks3 = createSaveTasks("4", "5");
-
-    // saveTasks1 is unrepresented because both of its operations were trumped.
-    // saveTasks3 is unrepresented because its operations were deleted.
-    SaveTasks coalescedSaves = createSaveTasks("3", "2", "1");
-
-    RemoveTasks removeTasks1 = createRemoveTasks("1", "2");
-    RemoveTasks removeTasks2 = createRemoveTasks("3");
-    RemoveTasks removeTasks3 = createRemoveTasks("4", "5");
-
-    RemoveTasks coalescedRemoves =
-        new RemoveTasks(ImmutableSet.copyOf(Iterables.concat(removeTasks2.getTaskIds(),
-            removeTasks3.getTaskIds())));
-
-    expectAppend(position1,
-        createLogEntry(
-            Op.saveTasks(coalescedSaves),
-            Op.removeTasks(removeTasks1),
-            Op.saveTasks(saveTasks3),
-            Op.removeTasks(coalescedRemoves)));
-
-    control.replay();
-
-    StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction();
-
-    // The next 2 saves should coalesce
-    streamTransaction.add(Op.saveTasks(saveTasks1));
-    streamTransaction.add(Op.saveTasks(saveTasks2));
-
-    streamTransaction.add(Op.removeTasks(removeTasks1));
-    streamTransaction.add(Op.saveTasks(saveTasks3));
-
-    // The next 2 removes should coalesce
-    streamTransaction.add(Op.removeTasks(removeTasks2));
-    streamTransaction.add(Op.removeTasks(removeTasks3));
-
-    assertEquals(position1, streamTransaction.commit());
-  }
-
-  @Test
   public void testTransactionSnapshot() throws CodingException {
     Snapshot snapshot = createSnapshot();
     DeduplicatedSnapshot deduplicated = new SnapshotDeduplicatorImpl().deduplicate(snapshot);
@@ -469,14 +419,12 @@ public class LogManagerTest extends EasyMockTest {
 
     expect(stream.readAll()).andReturn(entries.iterator());
 
-    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-    reader.accept(transaction1);
-    reader.accept(transaction2);
-
     StreamManager streamManager = createStreamManager(message.chunkSize);
     control.replay();
 
-    streamManager.readFromBeginning(reader);
+    assertEquals(
+        ImmutableList.of(transaction1, transaction2),
+        ImmutableList.copyOf(streamManager.readFromBeginning()));
   }
 
   @Test
@@ -494,9 +442,6 @@ public class LogManagerTest extends EasyMockTest {
 
     expect(stream.readAll()).andReturn(ImmutableList.of(snapshotEntry).iterator());
 
-    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-    reader.accept(snapshotLogEntry);
-
     control.replay();
 
     HashFunction md5 = Hashing.md5();
@@ -506,7 +451,9 @@ public class LogManagerTest extends EasyMockTest {
         md5,
         new SnapshotDeduplicatorImpl());
     streamManager.snapshot(snapshot);
-    streamManager.readFromBeginning(reader);
+    assertEquals(
+        ImmutableList.of(snapshotLogEntry),
+        ImmutableList.copyOf(streamManager.readFromBeginning()));
   }
 
   private Snapshot createSnapshot() {
@@ -517,15 +464,6 @@ public class LogManagerTest extends EasyMockTest {
         .setTasks(ImmutableSet.of(TaskTestUtil.makeTask("task_id", TaskTestUtil.JOB).newBuilder()));
   }
 
-  private SaveTasks createSaveTasks(String... taskIds) {
-    return new SaveTasks(ImmutableSet.copyOf(Iterables.transform(ImmutableList.copyOf(taskIds),
-        taskId -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(taskId)))));
-  }
-
-  private RemoveTasks createRemoveTasks(String... taskIds) {
-    return new RemoveTasks(ImmutableSet.copyOf(taskIds));
-  }
-
   private void expectFrames(Position position, Message message) throws CodingException {
     expect(stream.append(entryEq(message.header))).andReturn(position);
     for (LogEntry chunk : message.chunks) {