You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/11/07 23:07:24 UTC

incubator-aurora git commit: Fixing the log replay for the job update history pruner.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 2d68bc690 -> afee887af


Fixing the log replay for the job update history pruner.

Bugs closed: AURORA-912

Reviewed at https://reviews.apache.org/r/27598/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/afee887a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/afee887a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/afee887a

Branch: refs/heads/master
Commit: afee887af5de085c9a422cb0d63e8a382673b8c5
Parents: 2d68bc6
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Nov 7 14:05:11 2014 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Nov 7 14:05:11 2014 -0800

----------------------------------------------------------------------
 .../scheduler/storage/log/LogStorage.java       | 301 +++++++++++--------
 .../scheduler/storage/log/LogStorageTest.java   | 159 +++++++---
 2 files changed, 298 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/afee887a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index cbab759..0195557 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -19,6 +19,7 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import java.util.Date;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
@@ -29,9 +30,9 @@ import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+
 import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
 import com.twitter.common.base.Closure;
 import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.quantity.Amount;
@@ -221,10 +222,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   @Qualifier
   public @interface WriteBehind { }
 
-  // TODO(wfarner): This is a temporary emergency fix.  Revisit.
-  @CmdLine(name = "log_storage_fail_on_unknown_op",
-      help = "If true, fail if an unknown log transition operation is encountered.")
-  public static final Arg<Boolean> FAIL_ON_UNKNOWN_OP = Arg.create(false);
+  private final Map<LogEntry._Fields, Closure<LogEntry>> logEntryReplayActions;
+  private final Map<Op._Fields, Closure<Op>> transactionReplayActions;
 
   @Inject
   LogStorage(LogManager logManager,
@@ -306,6 +305,164 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         quotaStore,
         attributeStore,
         jobUpdateStore);
+
+    this.logEntryReplayActions = buildLogEntryReplayActions();
+    this.transactionReplayActions = buildTransactionReplayActions();
+  }
+
+  @VisibleForTesting
+  final Map<LogEntry._Fields, Closure<LogEntry>> buildLogEntryReplayActions() {
+
+    return ImmutableMap.<LogEntry._Fields, Closure<LogEntry>>builder()
+        .put(LogEntry._Fields.SNAPSHOT, new Closure<LogEntry>() {
+          @Override
+          public void execute(LogEntry logEntry) throws RuntimeException {
+            Snapshot snapshot = logEntry.getSnapshot();
+            LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
+            snapshotStore.applySnapshot(snapshot);
+          }
+        })
+        .put(LogEntry._Fields.TRANSACTION, new Closure<LogEntry>() {
+          @Override
+          public void execute(final LogEntry logEntry) throws RuntimeException {
+            write(new MutateWork.NoResult.Quiet() {
+              @Override
+              protected void execute(MutableStoreProvider unused) {
+                for (Op op : logEntry.getTransaction().getOps()) {
+                  replayOp(op);
+                }
+              }
+            });
+          }
+        })
+        .put(LogEntry._Fields.NOOP, new Closure<LogEntry>() {
+          @Override
+          public void execute(LogEntry item) throws RuntimeException {
+            // Nothing to do here
+          }
+        }).build();
+  }
+
+  @VisibleForTesting
+  final Map<Op._Fields, Closure<Op>> buildTransactionReplayActions() {
+
+    return ImmutableMap.<Op._Fields, Closure<Op>>builder()
+        .put(Op._Fields.SAVE_FRAMEWORK_ID, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId());
+          }
+        })
+        .put(Op._Fields.SAVE_ACCEPTED_JOB, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
+            writeBehindJobStore.saveAcceptedJob(
+                acceptedJob.getManagerId(),
+                IJobConfiguration.build(acceptedJob.getJobConfig()));
+          }
+        })
+        .put(Op._Fields.REMOVE_JOB, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
+          }
+        })
+        .put(Op._Fields.SAVE_TASKS, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindTaskStore.saveTasks(
+                IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
+          }
+        })
+        .put(Op._Fields.REWRITE_TASK, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            RewriteTask rewriteTask = op.getRewriteTask();
+            writeBehindTaskStore.unsafeModifyInPlace(
+                rewriteTask.getTaskId(),
+                ITaskConfig.build(rewriteTask.getTask()));
+          }
+        })
+        .put(Op._Fields.REMOVE_TASKS, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds());
+          }
+        })
+        .put(Op._Fields.SAVE_QUOTA, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            SaveQuota saveQuota = op.getSaveQuota();
+            writeBehindQuotaStore.saveQuota(
+                saveQuota.getRole(),
+                IResourceAggregate.build(saveQuota.getQuota()));
+          }
+        })
+        .put(Op._Fields.REMOVE_QUOTA, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole());
+          }
+        })
+        .put(Op._Fields.SAVE_HOST_ATTRIBUTES, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
+            // Prior to commit 5cf760b, the store would persist maintenance mode changes for
+            // unknown hosts.  5cf760b began rejecting these, but the replicated log may still
+            // contain entries with a null slave ID.
+            if (attributes.isSetSlaveId()) {
+              writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
+            } else {
+              LOG.info("Dropping host attributes with no slave ID: " + attributes);
+            }
+          }
+        })
+        .put(Op._Fields.SAVE_LOCK, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()));
+          }
+        })
+        .put(Op._Fields.REMOVE_LOCK, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
+          }
+        })
+        .put(Op._Fields.SAVE_JOB_UPDATE, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindJobUpdateStore.saveJobUpdate(
+                IJobUpdate.build(op.getSaveJobUpdate().getJobUpdate()),
+                Optional.fromNullable(op.getSaveJobUpdate().getLockToken()));
+          }
+        })
+        .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindJobUpdateStore.saveJobUpdateEvent(
+                IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()),
+                op.getSaveJobUpdateEvent().getUpdateId());
+          }
+        })
+        .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(IJobInstanceUpdateEvent.build(
+                op.getSaveJobInstanceUpdateEvent().getEvent()),
+                op.getSaveJobInstanceUpdateEvent().getUpdateId());
+          }
+        })
+        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, new Closure<Op>() {
+          @Override
+          public void execute(Op op) throws RuntimeException {
+            writeBehindJobUpdateStore.pruneHistory(
+                op.getPruneJobUpdateHistory().getPerJobRetainCount(),
+                op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs());
+          }
+        }).build();
   }
 
   @Override
@@ -367,132 +524,22 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
     }
   }
 
-  void replay(final LogEntry logEntry) {
-    switch (logEntry.getSetField()) {
-      case SNAPSHOT:
-        Snapshot snapshot = logEntry.getSnapshot();
-        LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
-        snapshotStore.applySnapshot(snapshot);
-        break;
-
-      case TRANSACTION:
-        write(new MutateWork.NoResult.Quiet() {
-          @Override
-          protected void execute(MutableStoreProvider unused) {
-            for (Op op : logEntry.getTransaction().getOps()) {
-              replayOp(op);
-            }
-          }
-        });
-        break;
-
-      case NOOP:
-        // Nothing to do here
-        break;
-
-      case DEFLATED_ENTRY:
-        throw new IllegalArgumentException("Deflated entries are not handled at this layer.");
-
-      case FRAME:
-        throw new IllegalArgumentException("Framed entries are not handled at this layer.");
-
-      case DEDUPLICATED_SNAPSHOT:
-        throw new IllegalArgumentException("Deduplicated snapshots are not handled at this layer.");
-
-      default:
-        throw new IllegalStateException("Unknown log entry type: " + logEntry);
+  private void replay(final LogEntry logEntry) {
+    LogEntry._Fields entryField = logEntry.getSetField();
+    if (!logEntryReplayActions.containsKey(entryField)) {
+      throw new IllegalStateException("Unknown log entry type: " + entryField);
     }
+
+    logEntryReplayActions.get(entryField).execute(logEntry);
   }
 
   private void replayOp(Op op) {
-    switch (op.getSetField()) {
-      case SAVE_FRAMEWORK_ID:
-        writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId());
-        break;
-
-      case SAVE_ACCEPTED_JOB:
-        SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
-        writeBehindJobStore.saveAcceptedJob(
-            acceptedJob.getManagerId(),
-            IJobConfiguration.build(acceptedJob.getJobConfig()));
-        break;
-
-      case REMOVE_JOB:
-        writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
-        break;
-
-      case SAVE_TASKS:
-        writeBehindTaskStore.saveTasks(
-            IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
-        break;
-
-      case REWRITE_TASK:
-        RewriteTask rewriteTask = op.getRewriteTask();
-        writeBehindTaskStore.unsafeModifyInPlace(
-            rewriteTask.getTaskId(),
-            ITaskConfig.build(rewriteTask.getTask()));
-        break;
-
-      case REMOVE_TASKS:
-        writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds());
-        break;
-
-      case SAVE_QUOTA:
-        SaveQuota saveQuota = op.getSaveQuota();
-        writeBehindQuotaStore.saveQuota(
-            saveQuota.getRole(),
-            IResourceAggregate.build(saveQuota.getQuota()));
-        break;
-
-      case REMOVE_QUOTA:
-        writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole());
-        break;
-
-      case SAVE_HOST_ATTRIBUTES:
-        HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
-        // Prior to commit 5cf760b, the store would persist maintenance mode changes for
-        // unknown hosts.  5cf760b began rejecting these, but the replicated log may still
-        // contain entries with a null slave ID.
-        if (attributes.isSetSlaveId()) {
-          writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
-        } else {
-          LOG.info("Dropping host attributes with no slave ID: " + attributes);
-        }
-        break;
-
-      case SAVE_LOCK:
-        writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()));
-        break;
-
-      case REMOVE_LOCK:
-        writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
-        break;
-
-      case SAVE_JOB_UPDATE:
-        writeBehindJobUpdateStore.saveJobUpdate(
-            IJobUpdate.build(op.getSaveJobUpdate().getJobUpdate()),
-            Optional.fromNullable(op.getSaveJobUpdate().getLockToken()));
-        break;
-
-      case SAVE_JOB_UPDATE_EVENT:
-        writeBehindJobUpdateStore.saveJobUpdateEvent(
-            IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()),
-            op.getSaveJobUpdateEvent().getUpdateId());
-        break;
-
-      case SAVE_JOB_INSTANCE_UPDATE_EVENT:
-        writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(IJobInstanceUpdateEvent.build(
-            op.getSaveJobInstanceUpdateEvent().getEvent()),
-            op.getSaveJobInstanceUpdateEvent().getUpdateId());
-        break;
-
-      default:
-        if (FAIL_ON_UNKNOWN_OP.get()) {
-          throw new IllegalStateException("Unknown transaction op: " + op);
-        } else {
-          LOG.log(Level.SEVERE, "Unkown transaction op: " + op);
-        }
+    Op._Fields opField = op.getSetField();
+    if (!transactionReplayActions.containsKey(opField)) {
+      throw new IllegalStateException("Unknown transaction op: " + opField);
     }
+
+    transactionReplayActions.get(opField).execute(op);
   }
 
   private void scheduleSnapshots() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/afee887a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 8eb5c3f..55a4784 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage.log;
 
+import java.util.EnumSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -22,9 +23,11 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
 import com.google.common.testing.TearDown;
+
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
@@ -205,37 +208,6 @@ public class LogStorageTest extends EasyMockTest {
       }
     });
 
-    Entry entry1 = createMock(Entry.class);
-    Entry entry2 = createMock(Entry.class);
-    Entry entry3 = createMock(Entry.class);
-    Entry entry4 = createMock(Entry.class);
-    Entry entry5 = createMock(Entry.class);
-    String frameworkId1 = "bob";
-    LogEntry recoveredEntry1 =
-        createTransaction(Op.saveFrameworkId(new SaveFrameworkId(frameworkId1)));
-    String frameworkId2 = "jim";
-    LogEntry recoveredEntry2 =
-        createTransaction(Op.saveFrameworkId(new SaveFrameworkId(frameworkId2)));
-    // This entry lacks a slave ID, and should therefore be discarded.
-    LogEntry recoveredEntry3 =
-        createTransaction(Op.saveHostAttributes(new SaveHostAttributes(new HostAttributes()
-            .setHost("host1")
-            .setMode(MaintenanceMode.DRAINED))));
-    IHostAttributes attributes = IHostAttributes.build(new HostAttributes()
-        .setHost("host2")
-        .setSlaveId("slave2")
-        .setMode(MaintenanceMode.DRAINED));
-    LogEntry recoveredEntry4 =
-        createTransaction(Op.saveHostAttributes(new SaveHostAttributes(attributes.newBuilder())));
-    LogEntry recoveredEntry5 =
-        createTransaction(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory()));
-    expect(entry1.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry1));
-    expect(entry2.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry2));
-    expect(entry3.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry3));
-    expect(entry4.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry4));
-    expect(entry5.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry5));
-    expect(stream.readAll()).andReturn(Iterators.forArray(entry1, entry2, entry3, entry4, entry5));
-
     final Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
     expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
         new IAnswer<Void>() {
@@ -245,9 +217,6 @@ public class LogStorageTest extends EasyMockTest {
             return null;
           }
         });
-    storageUtil.schedulerStore.saveFrameworkId(frameworkId1);
-    storageUtil.schedulerStore.saveFrameworkId(frameworkId2);
-    storageUtil.attributeStore.saveHostAttributes(attributes);
 
     final Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture();
     expect(storageUtil.storage.write(capture(initializationWork))).andAnswer(
@@ -279,7 +248,10 @@ public class LogStorageTest extends EasyMockTest {
             snapshotWork.getValue().apply(storageUtil.mutableStoreProvider);
             return null;
           }
-        }).times(5);
+        }).anyTimes();
+
+    // Populate all LogEntry types.
+    buildReplayLogEntries();
 
     control.replay();
 
@@ -289,6 +261,123 @@ public class LogStorageTest extends EasyMockTest {
 
     // Run the snapshot thread.
     snapshotAction.getValue().run();
+
+    // Assert all LogEntry types have handlers defined.
+    // Our current StreamManagerImpl.readFromBeginning() does not let some entries escape
+    // the decoding routine making handling them in replay unnecessary.
+    assertEquals(
+        Sets.complementOf(EnumSet.of(
+            LogEntry._Fields.FRAME,
+            LogEntry._Fields.DEDUPLICATED_SNAPSHOT,
+            LogEntry._Fields.DEFLATED_ENTRY)),
+        EnumSet.copyOf(logStorage.buildLogEntryReplayActions().keySet()));
+
+    // Assert all Transaction types have handlers defined.
+    assertEquals(
+        EnumSet.allOf(Op._Fields.class),
+        EnumSet.copyOf(logStorage.buildTransactionReplayActions().keySet()));
+  }
+
+  private void buildReplayLogEntries() throws Exception {
+    ImmutableSet.Builder<LogEntry> builder = ImmutableSet.builder();
+
+    builder.add(createTransaction(Op.saveFrameworkId(new SaveFrameworkId("bob"))));
+    storageUtil.schedulerStore.saveFrameworkId("bob");
+
+    SaveAcceptedJob acceptedJob =
+        new SaveAcceptedJob().setManagerId("CRON").setJobConfig(new JobConfiguration());
+    builder.add(createTransaction(Op.saveAcceptedJob(acceptedJob)));
+    storageUtil.jobStore.saveAcceptedJob(
+        acceptedJob.getManagerId(),
+        IJobConfiguration.build(acceptedJob.getJobConfig()));
+
+    RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
+    builder.add(createTransaction(Op.removeJob(removeJob)));
+    storageUtil.jobStore.removeJob(JOB_KEY);
+
+    SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(new ScheduledTask()));
+    builder.add(createTransaction(Op.saveTasks(saveTasks)));
+    storageUtil.taskStore.saveTasks(IScheduledTask.setFromBuilders(saveTasks.getTasks()));
+
+    RewriteTask rewriteTask = new RewriteTask("id1", new TaskConfig());
+    builder.add(createTransaction(Op.rewriteTask(rewriteTask)));
+    expect(storageUtil.taskStore.unsafeModifyInPlace(
+        rewriteTask.getTaskId(),
+        ITaskConfig.build(rewriteTask.getTask()))).andReturn(true);
+
+    RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.<String>of("taskId1"));
+    builder.add(createTransaction(Op.removeTasks(removeTasks)));
+    storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
+
+    SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), new ResourceAggregate());
+    builder.add(createTransaction(Op.saveQuota(saveQuota)));
+    storageUtil.quotaStore.saveQuota(
+        saveQuota.getRole(),
+        IResourceAggregate.build(saveQuota.getQuota()));
+
+    builder.add(createTransaction(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole()))));
+    storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
+
+    // This entry lacks a slave ID, and should therefore be discarded.
+    SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
+        .setHost("host1")
+        .setMode(MaintenanceMode.DRAINED));
+    builder.add(createTransaction(Op.saveHostAttributes(hostAttributes1)));
+
+    SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
+        .setHost("host2")
+        .setSlaveId("slave2")
+        .setMode(MaintenanceMode.DRAINED));
+    builder.add(createTransaction(Op.saveHostAttributes(hostAttributes2)));
+    storageUtil.attributeStore.saveHostAttributes(
+        IHostAttributes.build(hostAttributes2.getHostAttributes()));
+
+    SaveLock saveLock = new SaveLock(new Lock().setKey(LockKey.job(JOB_KEY.newBuilder())));
+    builder.add(createTransaction(Op.saveLock(saveLock)));
+    storageUtil.lockStore.saveLock(ILock.build(saveLock.getLock()));
+
+    RemoveLock removeLock = new RemoveLock(LockKey.job(JOB_KEY.newBuilder()));
+    builder.add(createTransaction(Op.removeLock(removeLock)));
+    storageUtil.lockStore.removeLock(ILockKey.build(removeLock.getLockKey()));
+
+    SaveJobUpdate saveUpdate = new SaveJobUpdate(new JobUpdate(), "token");
+    builder.add(createTransaction(Op.saveJobUpdate(saveUpdate)));
+    storageUtil.jobUpdateStore.saveJobUpdate(
+        IJobUpdate.build(saveUpdate.getJobUpdate()),
+        Optional.of(saveUpdate.getLockToken()));
+
+    SaveJobUpdateEvent saveUpdateEvent = new SaveJobUpdateEvent(new JobUpdateEvent(), "update");
+    builder.add(createTransaction(Op.saveJobUpdateEvent(saveUpdateEvent)));
+    storageUtil.jobUpdateStore.saveJobUpdateEvent(
+        IJobUpdateEvent.build(saveUpdateEvent.getEvent()),
+        saveUpdateEvent.getUpdateId());
+
+    SaveJobInstanceUpdateEvent saveInstanceEvent =
+        new SaveJobInstanceUpdateEvent(new JobInstanceUpdateEvent(), "update");
+    builder.add(createTransaction(Op.saveJobInstanceUpdateEvent(saveInstanceEvent)));
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+        IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()),
+        saveInstanceEvent.getUpdateId());
+
+    builder.add(createTransaction(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L))));
+    expect(storageUtil.jobUpdateStore.pruneHistory(5, 10L)).andReturn(ImmutableSet.of("id2"));
+
+    // NOOP LogEntry
+    builder.add(LogEntry.noop(true));
+
+    // Snapshot LogEntry
+    Snapshot snapshot = new Snapshot();
+    builder.add(LogEntry.snapshot(snapshot));
+    snapshotStore.applySnapshot(snapshot);
+
+    ImmutableSet.Builder<Entry> entryBuilder = ImmutableSet.builder();
+    for (LogEntry logEntry : builder.build()) {
+      Entry entry = createMock(Entry.class);
+      entryBuilder.add(entry);
+      expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry));
+    }
+
+    expect(stream.readAll()).andReturn(entryBuilder.build().iterator());
   }
 
   abstract class StorageTestFixture {