You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/03 03:59:11 UTC

[2/4] aurora git commit: Extract a storage Persistence layer

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
new file mode 100644
index 0000000..07912b6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
@@ -0,0 +1,781 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveJobUpdates;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.resources.ResourceTestUtil;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.diskMb;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeConfig;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DurableStorageTest extends EasyMockTest {
+
+  private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "name");
+  private static final IJobUpdateKey UPDATE_ID =
+      IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "testUpdateId"));
+
+  private DurableStorage durableStorage;
+  private Persistence persistence;
+  private StorageTestUtil storageUtil;
+  private EventSink eventSink;
+
+  @Before
+  public void setUp() {
+    persistence = createMock(Persistence.class);
+    storageUtil = new StorageTestUtil(this);
+    eventSink = createMock(EventSink.class);
+
+    durableStorage = new DurableStorage(
+        persistence,
+        storageUtil.storage,
+        storageUtil.schedulerStore,
+        storageUtil.jobStore,
+        storageUtil.taskStore,
+        storageUtil.quotaStore,
+        storageUtil.attributeStore,
+        storageUtil.jobUpdateStore,
+        eventSink,
+        new ReentrantLock(),
+        TaskTestUtil.THRIFT_BACKFILL);
+
+    storageUtil.storage.prepare();
+  }
+
+  @Test
+  public void testStart() throws Exception {
+    // We should initialize persistence.
+    persistence.prepare();
+
+    // Our start should recover persistence and then forward to the underlying storage start of the
+    // supplied initialization logic.
+    AtomicBoolean initialized = new AtomicBoolean(false);
+    MutateWork.NoResult.Quiet initializationLogic = provider -> {
+      // Creating a mock and expecting apply(storeProvider) does not work here for whatever
+      // reason.
+      initialized.set(true);
+    };
+
+    Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
+    storageUtil.storage.write(capture(recoverAndInitializeWork));
+    expectLastCall().andAnswer(() -> {
+      recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
+      return null;
+    });
+
+    Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture();
+    expect(storageUtil.storage.write(capture(initializationWork))).andAnswer(
+        () -> {
+          initializationWork.getValue().apply(storageUtil.mutableStoreProvider);
+          return null;
+        });
+
+    // Populate all Op types.
+    buildReplayOps();
+
+    control.replay();
+
+    durableStorage.prepare();
+    durableStorage.start(initializationLogic);
+    assertTrue(initialized.get());
+
+    // Assert all Transaction types have handlers defined.
+    assertEquals(
+        EnumSet.allOf(Op._Fields.class),
+        EnumSet.copyOf(durableStorage.buildTransactionReplayActions().keySet()));
+  }
+
+  private void buildReplayOps() throws Exception {
+    ImmutableSet.Builder<Op> builder = ImmutableSet.builder();
+
+    builder.add(Op.saveFrameworkId(new SaveFrameworkId("bob")));
+    storageUtil.schedulerStore.saveFrameworkId("bob");
+
+    JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig());
+    JobConfiguration expectedJob =
+        new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder());
+    SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob);
+    builder.add(Op.saveCronJob(cronJob));
+    storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob));
+
+    RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
+    builder.add(Op.removeJob(removeJob));
+    storageUtil.jobStore.removeJob(JOB_KEY);
+
+    ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder();
+    actualTask.getAssignedTask().setTask(nonBackfilledConfig());
+    IScheduledTask expectedTask = makeTask("id", JOB_KEY);
+    SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask));
+    builder.add(Op.saveTasks(saveTasks));
+    storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask));
+
+    RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
+    builder.add(Op.removeTasks(removeTasks));
+    storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
+
+    ResourceAggregate nonBackfilled = new ResourceAggregate()
+        .setNumCpus(1.0)
+        .setRamMb(32)
+        .setDiskMb(64);
+    SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled);
+    builder.add(Op.saveQuota(saveQuota));
+    storageUtil.quotaStore.saveQuota(
+        saveQuota.getRole(),
+        IResourceAggregate.build(nonBackfilled.deepCopy()
+            .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))));
+
+    builder.add(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole())));
+    storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
+
+    // This entry lacks a slave ID, and should therefore be discarded.
+    SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
+        .setHost("host1")
+        .setMode(MaintenanceMode.DRAINED));
+    builder.add(Op.saveHostAttributes(hostAttributes1));
+
+    SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
+        .setHost("host2")
+        .setSlaveId("slave2")
+        .setMode(MaintenanceMode.DRAINED));
+    builder.add(Op.saveHostAttributes(hostAttributes2));
+    expect(storageUtil.attributeStore.saveHostAttributes(
+        IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true);
+
+    JobUpdate actualUpdate = new JobUpdate()
+        .setSummary(new JobUpdateSummary().setKey(UPDATE_ID.newBuilder()))
+        .setInstructions(new JobUpdateInstructions()
+            .setInitialState(
+                ImmutableSet.of(new InstanceTaskConfig().setTask(nonBackfilledConfig())))
+            .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig())));
+    JobUpdate expectedUpdate = actualUpdate.deepCopy();
+    expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder());
+    expectedUpdate.getInstructions().getInitialState()
+        .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
+    SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
+    builder.add(Op.saveJobUpdate(saveUpdate));
+    storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
+
+    SaveJobUpdateEvent saveUpdateEvent =
+        new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder());
+    builder.add(Op.saveJobUpdateEvent(saveUpdateEvent));
+    storageUtil.jobUpdateStore.saveJobUpdateEvent(
+        UPDATE_ID,
+        IJobUpdateEvent.build(saveUpdateEvent.getEvent()));
+
+    SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent(
+        new JobInstanceUpdateEvent(),
+        UPDATE_ID.newBuilder());
+    builder.add(Op.saveJobInstanceUpdateEvent(saveInstanceEvent));
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+        UPDATE_ID,
+        IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()));
+
+    builder.add(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L)));
+    // No expectation - this op is ignored.
+
+    builder.add(Op.removeJobUpdate(
+        new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder()))));
+    storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID));
+
+    expect(persistence.recover()).andReturn(builder.build().stream());
+  }
+
+  private TaskConfig nonBackfilledConfig() {
+    // When more fields have to be backfilled
+    // modify this method.
+    return makeConfig(JOB_KEY).newBuilder();
+  }
+
+  abstract class AbstractStorageFixture {
+    private final AtomicBoolean runCalled = new AtomicBoolean(false);
+
+    AbstractStorageFixture() {
+      // Prevent otherwise silent noop tests that forget to call run().
+      addTearDown(new TearDown() {
+        @Override
+        public void tearDown() {
+          assertTrue(runCalled.get());
+        }
+      });
+    }
+
+    void run() throws Exception {
+      runCalled.set(true);
+
+      // Expect basic start operations.
+
+      // Initialize persistence.
+      persistence.prepare();
+
+      // Replay the ops and perform and supplied initializationWork.
+      // Simulate NOOP initialization work
+      // Creating a mock and expecting apply(storeProvider) does not work here for whatever
+      // reason.
+      MutateWork.NoResult.Quiet initializationLogic = storeProvider -> {
+        // No-op.
+      };
+
+      Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
+      storageUtil.storage.write(capture(recoverAndInitializeWork));
+      expectLastCall().andAnswer(() -> {
+        recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
+        return null;
+      });
+
+      expect(persistence.recover()).andReturn(Stream.empty());
+      Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
+      expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
+          () -> {
+            recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
+            return null;
+          });
+
+      // Setup custom test expectations.
+      setupExpectations();
+
+      control.replay();
+
+      // Start the system.
+      durableStorage.prepare();
+      durableStorage.start(initializationLogic);
+
+      // Exercise the system.
+      runTest();
+    }
+
+    protected void setupExpectations() throws Exception {
+      // Default to no expectations.
+    }
+
+    protected abstract void runTest();
+  }
+
+  abstract class AbstractMutationFixture extends AbstractStorageFixture {
+    @Override
+    protected void runTest() {
+      durableStorage.write((Quiet) AbstractMutationFixture.this::performMutations);
+    }
+
+    protected abstract void performMutations(MutableStoreProvider storeProvider);
+  }
+
+  private void expectPersist(Op op, Op... ops) {
+    try {
+      // Workaround for comparing streams.
+      persistence.persist(anyObject());
+      expectLastCall().andAnswer((IAnswer<Void>) () -> {
+        assertEquals(
+            ImmutableList.<Op>builder().add(op).add(ops).build(),
+            ((Stream<Op>) EasyMock.getCurrentArguments()[0]).collect(Collectors.toList()));
+
+        return null;
+      });
+    } catch (Persistence.PersistenceException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testSaveFrameworkId() throws Exception {
+    String frameworkId = "bob";
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.schedulerStore.saveFrameworkId(frameworkId);
+        expectPersist(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getSchedulerStore().saveFrameworkId(frameworkId);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveAcceptedJob() throws Exception {
+    IJobConfiguration jobConfig =
+        IJobConfiguration.build(new JobConfiguration().setKey(JOB_KEY.newBuilder()));
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobStore.saveAcceptedJob(jobConfig);
+        expectPersist(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getCronJobStore().saveAcceptedJob(jobConfig);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveJob() throws Exception {
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobStore.removeJob(JOB_KEY);
+        expectPersist(Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getCronJobStore().removeJob(JOB_KEY);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveTasks() throws Exception {
+    Set<IScheduledTask> tasks = ImmutableSet.of(task("a", ScheduleStatus.INIT));
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.saveTasks(tasks);
+        expectPersist(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(tasks);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testMutateTasks() throws Exception {
+    String taskId = "fred";
+    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+        expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testNestedTransactions() throws Exception {
+    String taskId = "fred";
+    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
+    ImmutableSet<String> tasksToRemove = ImmutableSet.of("b");
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+        storageUtil.taskStore.deleteTasks(tasksToRemove);
+
+        expectPersist(
+            Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))),
+            Op.removeTasks(new RemoveTasks(tasksToRemove)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+
+        durableStorage.write((NoResult.Quiet)
+            innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveAndMutateTasks() throws Exception {
+    String taskId = "fred";
+    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+    Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT));
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.saveTasks(saved);
+
+        // Nested transaction with result.
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+        // Resulting stream operation.
+        expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(saved);
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception {
+    String taskId = "fred";
+    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+    Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT));
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.saveTasks(saved);
+
+        // Nested transaction with result.
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+        // Resulting stream operation.
+        expectPersist(Op.saveTasks(new SaveTasks(
+                ImmutableSet.<ScheduledTask>builder()
+                    .addAll(IScheduledTask.toBuildersList(saved))
+                    .add(mutated.get().newBuilder())
+                    .build())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(saved);
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveTasksQuery() throws Exception {
+    IScheduledTask task = task("a", ScheduleStatus.FINISHED);
+    Set<String> taskIds = Tasks.ids(task);
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.deleteTasks(taskIds);
+        expectPersist(Op.removeTasks(new RemoveTasks(taskIds)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveTasksIds() throws Exception {
+    Set<String> taskIds = ImmutableSet.of("42");
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.taskStore.deleteTasks(taskIds);
+        expectPersist(Op.removeTasks(new RemoveTasks(taskIds)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveQuota() throws Exception {
+    String role = "role";
+    IResourceAggregate quota = ResourceTestUtil.aggregate(1.0, 128L, 1024L);
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.quotaStore.saveQuota(role, quota);
+        expectPersist(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getQuotaStore().saveQuota(role, quota);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveQuota() throws Exception {
+    String role = "role";
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.quotaStore.removeQuota(role);
+        expectPersist(Op.removeQuota(new RemoveQuota(role)));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getQuotaStore().removeQuota(role);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveHostAttributes() throws Exception {
+    String host = "hostname";
+    Set<Attribute> attributes =
+        ImmutableSet.of(new Attribute().setName("attr").setValues(ImmutableSet.of("value")));
+    Optional<IHostAttributes> hostAttributes = Optional.of(
+        IHostAttributes.build(new HostAttributes()
+            .setHost(host)
+            .setAttributes(attributes)));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        expect(storageUtil.attributeStore.getHostAttributes(host))
+            .andReturn(Optional.absent());
+
+        expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
+
+        expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true);
+        eventSink.post(new PubsubEvent.HostAttributesChanged(hostAttributes.get()));
+        expectPersist(
+            Op.saveHostAttributes(new SaveHostAttributes(hostAttributes.get().newBuilder())));
+
+        expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get()))
+            .andReturn(false);
+
+        expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        AttributeStore.Mutable store = storeProvider.getAttributeStore();
+        assertEquals(Optional.absent(), store.getHostAttributes(host));
+
+        assertTrue(store.saveHostAttributes(hostAttributes.get()));
+
+        assertEquals(hostAttributes, store.getHostAttributes(host));
+
+        assertFalse(store.saveHostAttributes(hostAttributes.get()));
+
+        assertEquals(hostAttributes, store.getHostAttributes(host));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveUpdate() throws Exception {
+    IJobUpdate update = IJobUpdate.build(new JobUpdate()
+        .setSummary(new JobUpdateSummary()
+            .setKey(UPDATE_ID.newBuilder())
+            .setUser("user"))
+        .setInstructions(new JobUpdateInstructions()
+            .setDesiredState(new InstanceTaskConfig()
+                .setTask(new TaskConfig())
+                .setInstances(ImmutableSet.of(new Range(0, 3))))
+            .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
+                .setTask(new TaskConfig())
+                .setInstances(ImmutableSet.of(new Range(0, 3)))))
+            .setSettings(new JobUpdateSettings())));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobUpdateStore.saveJobUpdate(update);
+        expectPersist(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().saveJobUpdate(update);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveJobUpdateEvent() throws Exception {
+    IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent()
+        .setStatus(JobUpdateStatus.ROLLING_BACK)
+        .setTimestampMs(12345L));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobUpdateStore.saveJobUpdateEvent(UPDATE_ID, event);
+        expectPersist(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(
+            event.newBuilder(),
+            UPDATE_ID.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().saveJobUpdateEvent(UPDATE_ID, event);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testSaveJobInstanceUpdateEvent() throws Exception {
+    IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(new JobInstanceUpdateEvent()
+        .setAction(JobUpdateAction.INSTANCE_ROLLING_BACK)
+        .setTimestampMs(12345L)
+        .setInstanceId(0));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(UPDATE_ID, event);
+        expectPersist(Op.saveJobInstanceUpdateEvent(
+            new SaveJobInstanceUpdateEvent(
+                event.newBuilder(),
+                UPDATE_ID.newBuilder())));
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(UPDATE_ID, event);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testRemoveJobUpdates() throws Exception {
+    IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey()
+        .setJob(JOB_KEY.newBuilder())
+        .setId("update-id"));
+
+    new AbstractMutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWrite();
+        storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(key));
+
+        // No transaction is generated since this version is currently in 'read-only'
+        // compatibility mode for this operation type.
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key));
+      }
+    }.run();
+  }
+
+  private static IScheduledTask task(String id, ScheduleStatus status) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setAssignedTask(new AssignedTask()
+            .setTaskId(id)
+            .setTask(new TaskConfig())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
new file mode 100644
index 0000000..219576b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.diskMb;
+import static org.apache.aurora.gen.Resource.namedPort;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class ThriftBackfillTest extends EasyMockTest {
+
+  private ThriftBackfill thriftBackfill;
+  private TierManager tierManager;
+
+  @Before
+  public void setUp() {
+    tierManager = createMock(TierManager.class);
+    thriftBackfill = new ThriftBackfill(tierManager);
+  }
+
+  @Test
+  public void testFieldsToSetNoPorts() {
+    TaskConfig config = new TaskConfig()
+        .setResources(ImmutableSet.of(
+            numCpus(1.0),
+            ramMb(32),
+            diskMb(64)))
+        .setProduction(false)
+        .setTier("tierName");
+    TaskConfig expected = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.DEV_TIER);
+
+    control.replay();
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test
+  public void testResourceAggregateFieldsToSet() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setNumCpus(1.0)
+        .setRamMb(32)
+        .setDiskMb(64);
+
+    IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))));
+
+    assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
+  }
+
+  @Test
+  public void testResourceAggregateSetToFields() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
+        .setNumCpus(1.0)
+        .setRamMb(32)
+        .setDiskMb(64));
+
+    assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testResourceAggregateTooManyResources() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64), numCpus(2.0)));
+    ThriftBackfill.backfillResourceAggregate(aggregate);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testResourceAggregateInvalidResources() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), namedPort("http")));
+    ThriftBackfill.backfillResourceAggregate(aggregate);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testResourceAggregateMissingResources() {
+    control.replay();
+
+    ResourceAggregate aggregate = new ResourceAggregate()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32)));
+    ThriftBackfill.backfillResourceAggregate(aggregate);
+  }
+
+  @Test
+  public void testBackfillTierProduction() {
+    TaskConfig config = new TaskConfig()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
+        .setProduction(true)
+        .setTier("tierName");
+    TaskConfig expected = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.PREFERRED_TIER);
+
+    control.replay();
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test
+  public void testBackfillTierNotProduction() {
+    TaskConfig config = new TaskConfig()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
+        .setProduction(true)
+        .setTier("tierName");
+    TaskConfig configWithBackfilledResources = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTier(ITaskConfig.build(configWithBackfilledResources)))
+        .andReturn(TaskTestUtil.DEV_TIER);
+
+    control.replay();
+
+    TaskConfig expected = configWithBackfilledResources.deepCopy()
+        .setProduction(false);
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test
+  public void testBackfillTierSetsTierToPreemptible() {
+    TaskConfig config = new TaskConfig()
+            .setResources(ImmutableSet.of(
+                    numCpus(1.0),
+                    ramMb(32),
+                    diskMb(64)));
+    TaskConfig configWithBackfilledResources = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
+
+    control.replay();
+
+    TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preemptible");
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test
+  public void testBackfillTierSetsTierToPreferred() {
+    TaskConfig config = new TaskConfig()
+        .setResources(ImmutableSet.of(
+            numCpus(1.0),
+            ramMb(32),
+            diskMb(64)))
+        .setProduction(true);
+    TaskConfig configWithBackfilledResources = config.deepCopy()
+        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+    expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
+
+    control.replay();
+
+    TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preferred");
+
+    assertEquals(
+        expected,
+        thriftBackfill.backfillTask(config));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBackfillTierBadTierConfiguration() {
+    TaskConfig config = new TaskConfig()
+            .setResources(ImmutableSet.of(
+                    numCpus(1.0),
+                    ramMb(32),
+                    diskMb(64)));
+
+    expect(tierManager.getTiers()).andReturn(ImmutableMap.of());
+
+    control.replay();
+
+    thriftBackfill.backfillTask(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
new file mode 100644
index 0000000..cbad3eb
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TransactionRecorderTest {
+  @Test
+  public void testCoalesce() throws Exception {
+    // No coalescing - different operation types.
+    assertEquals(
+        ImmutableList.of(
+            Op.removeTasks(createRemoveTasks("1", "2")),
+            Op.saveTasks(createSaveTasks("4", "5"))),
+        record(
+            Op.removeTasks(createRemoveTasks("1", "2")),
+            Op.saveTasks(createSaveTasks("4", "5"))));
+
+    assertEquals(
+        ImmutableList.of(Op.removeTasks(createRemoveTasks("1", "2", "3", "4"))),
+        record(
+            Op.removeTasks(createRemoveTasks("1", "2")),
+            Op.removeTasks(createRemoveTasks("3", "4"))));
+
+    assertEquals(
+        ImmutableList.of(Op.saveTasks(createSaveTasks("3", "2", "1"))),
+        record(Op.saveTasks(createSaveTasks("1", "2")), Op.saveTasks(createSaveTasks("1", "3"))));
+
+    assertEquals(
+        ImmutableList.of(Op.removeTasks(createRemoveTasks("3", "4", "5"))),
+        record(
+            Op.removeTasks(createRemoveTasks("3")),
+            Op.removeTasks(createRemoveTasks("4", "5"))));
+  }
+
+  private static List<Op> record(Op... ops) {
+    TransactionRecorder recorder = new TransactionRecorder();
+    Stream.of(ops).forEach(recorder::add);
+    return recorder.getOps();
+  }
+
+  private static SaveTasks createSaveTasks(String... taskIds) {
+    return new SaveTasks().setTasks(
+        Stream.of(taskIds)
+            .map(id -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(id)))
+            .collect(Collectors.toSet())
+    );
+  }
+
+  private RemoveTasks createRemoveTasks(String... taskIds) {
+    return new RemoveTasks(ImmutableSet.copyOf(taskIds));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
new file mode 100644
index 0000000..e8b564b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class WriteAheadStorageTest extends EasyMockTest {
+
+  private TransactionManager transactionManager;
+  private TaskStore.Mutable taskStore;
+  private AttributeStore.Mutable attributeStore;
+  private JobUpdateStore.Mutable jobUpdateStore;
+  private EventSink eventSink;
+  private WriteAheadStorage storage;
+
+  @Before
+  public void setUp() {
+    transactionManager = createMock(TransactionManager.class);
+    taskStore = createMock(TaskStore.Mutable.class);
+    attributeStore = createMock(AttributeStore.Mutable.class);
+    jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
+    eventSink = createMock(EventSink.class);
+
+    storage = new WriteAheadStorage(
+        transactionManager,
+        createMock(SchedulerStore.Mutable.class),
+        createMock(CronJobStore.Mutable.class),
+        taskStore,
+        createMock(QuotaStore.Mutable.class),
+        attributeStore,
+        jobUpdateStore,
+        LoggerFactory.getLogger(WriteAheadStorageTest.class),
+        eventSink);
+  }
+
+  private void expectOp(Op op) {
+    expect(transactionManager.hasActiveTransaction()).andReturn(true);
+    transactionManager.log(op);
+  }
+
+  @Test
+  public void testRemoveUpdates() {
+    Set<IJobUpdateKey> removed = ImmutableSet.of(
+        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
+        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
+    jobUpdateStore.removeJobUpdates(removed);
+    // No operation is written since this Op is in read-only compatibility mode.
+
+    control.replay();
+
+    storage.removeJobUpdates(removed);
+  }
+
+  @Test
+  public void testMutate() {
+    String taskId = "a";
+    Function<IScheduledTask, IScheduledTask> mutator =
+        createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
+    Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
+
+    expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
+    expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+    control.replay();
+
+    assertEquals(mutated, storage.mutateTask(taskId, mutator));
+  }
+
+  @Test
+  public void testSaveHostAttributes() {
+    IHostAttributes attributes = IHostAttributes.build(
+        new HostAttributes()
+            .setHost("a")
+            .setMode(MaintenanceMode.DRAINING)
+            .setAttributes(ImmutableSet.of(
+                new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
+
+    expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
+    expectOp(Op.saveHostAttributes(
+        new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
+    eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
+
+    expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
+
+    control.replay();
+
+    assertTrue(storage.saveHostAttributes(attributes));
+
+    assertFalse(storage.saveHostAttributes(attributes));
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteAllTasks() {
+    control.replay();
+    storage.deleteAllTasks();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteHostAttributes() {
+    control.replay();
+    storage.deleteHostAttributes();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteJobs() {
+    control.replay();
+    storage.deleteJobs();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteQuotas() {
+    control.replay();
+    storage.deleteQuotas();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testDeleteAllUpdatesAndEvents() {
+    control.replay();
+    storage.deleteAllUpdates();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
index 3f44559..cb38f10 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
@@ -21,7 +21,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.function.Consumer;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
@@ -37,10 +36,8 @@ import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.storage.DeduplicatedSnapshot;
 import org.apache.aurora.gen.storage.Frame;
 import org.apache.aurora.gen.storage.FrameChunk;
@@ -48,9 +45,7 @@ import org.apache.aurora.gen.storage.FrameHeader;
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Op;
 import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveTasks;
 import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveTasks;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
@@ -112,11 +107,11 @@ public class LogManagerTest extends EasyMockTest {
   public void testStreamManagerReadFromUnknownNone() throws CodingException {
     expect(stream.readAll()).andReturn(Collections.emptyIterator());
 
-    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-
     control.replay();
 
-    createNoMessagesStreamManager().readFromBeginning(reader);
+    assertEquals(
+        ImmutableList.of(),
+        ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning()));
   }
 
   @Test
@@ -127,12 +122,11 @@ public class LogManagerTest extends EasyMockTest {
     expect(entry1.contents()).andReturn(encode(transaction1));
     expect(stream.readAll()).andReturn(Iterators.singletonIterator(entry1));
 
-    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-    reader.accept(transaction1);
-
     control.replay();
 
-    createNoMessagesStreamManager().readFromBeginning(reader);
+    assertEquals(
+        ImmutableList.of(transaction1),
+        ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning()));
   }
 
   @Test
@@ -214,50 +208,6 @@ public class LogManagerTest extends EasyMockTest {
   }
 
   @Test
-  public void testCoalesce() throws CodingException {
-    SaveTasks saveTasks1 = createSaveTasks("1", "2");
-    createSaveTasks("2");
-    SaveTasks saveTasks2 = createSaveTasks("1", "3");
-    SaveTasks saveTasks3 = createSaveTasks("4", "5");
-
-    // saveTasks1 is unrepresented because both of its operations were trumped.
-    // saveTasks3 is unrepresented because its operations were deleted.
-    SaveTasks coalescedSaves = createSaveTasks("3", "2", "1");
-
-    RemoveTasks removeTasks1 = createRemoveTasks("1", "2");
-    RemoveTasks removeTasks2 = createRemoveTasks("3");
-    RemoveTasks removeTasks3 = createRemoveTasks("4", "5");
-
-    RemoveTasks coalescedRemoves =
-        new RemoveTasks(ImmutableSet.copyOf(Iterables.concat(removeTasks2.getTaskIds(),
-            removeTasks3.getTaskIds())));
-
-    expectAppend(position1,
-        createLogEntry(
-            Op.saveTasks(coalescedSaves),
-            Op.removeTasks(removeTasks1),
-            Op.saveTasks(saveTasks3),
-            Op.removeTasks(coalescedRemoves)));
-
-    control.replay();
-
-    StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction();
-
-    // The next 2 saves should coalesce
-    streamTransaction.add(Op.saveTasks(saveTasks1));
-    streamTransaction.add(Op.saveTasks(saveTasks2));
-
-    streamTransaction.add(Op.removeTasks(removeTasks1));
-    streamTransaction.add(Op.saveTasks(saveTasks3));
-
-    // The next 2 removes should coalesce
-    streamTransaction.add(Op.removeTasks(removeTasks2));
-    streamTransaction.add(Op.removeTasks(removeTasks3));
-
-    assertEquals(position1, streamTransaction.commit());
-  }
-
-  @Test
   public void testTransactionSnapshot() throws CodingException {
     Snapshot snapshot = createSnapshot();
     DeduplicatedSnapshot deduplicated = new SnapshotDeduplicatorImpl().deduplicate(snapshot);
@@ -469,14 +419,12 @@ public class LogManagerTest extends EasyMockTest {
 
     expect(stream.readAll()).andReturn(entries.iterator());
 
-    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-    reader.accept(transaction1);
-    reader.accept(transaction2);
-
     StreamManager streamManager = createStreamManager(message.chunkSize);
     control.replay();
 
-    streamManager.readFromBeginning(reader);
+    assertEquals(
+        ImmutableList.of(transaction1, transaction2),
+        ImmutableList.copyOf(streamManager.readFromBeginning()));
   }
 
   @Test
@@ -494,9 +442,6 @@ public class LogManagerTest extends EasyMockTest {
 
     expect(stream.readAll()).andReturn(ImmutableList.of(snapshotEntry).iterator());
 
-    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-    reader.accept(snapshotLogEntry);
-
     control.replay();
 
     HashFunction md5 = Hashing.md5();
@@ -506,7 +451,9 @@ public class LogManagerTest extends EasyMockTest {
         md5,
         new SnapshotDeduplicatorImpl());
     streamManager.snapshot(snapshot);
-    streamManager.readFromBeginning(reader);
+    assertEquals(
+        ImmutableList.of(snapshotLogEntry),
+        ImmutableList.copyOf(streamManager.readFromBeginning()));
   }
 
   private Snapshot createSnapshot() {
@@ -517,15 +464,6 @@ public class LogManagerTest extends EasyMockTest {
         .setTasks(ImmutableSet.of(TaskTestUtil.makeTask("task_id", TaskTestUtil.JOB).newBuilder()));
   }
 
-  private SaveTasks createSaveTasks(String... taskIds) {
-    return new SaveTasks(ImmutableSet.copyOf(Iterables.transform(ImmutableList.copyOf(taskIds),
-        taskId -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(taskId)))));
-  }
-
-  private RemoveTasks createRemoveTasks(String... taskIds) {
-    return new RemoveTasks(ImmutableSet.copyOf(taskIds));
-  }
-
   private void expectFrames(Position position, Message message) throws CodingException {
     expect(stream.append(entryEq(message.header))).andReturn(position);
     for (LogEntry chunk : message.chunks) {