You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/11/07 23:07:24 UTC
incubator-aurora git commit: Fixing the log replay for the job update
history pruner.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 2d68bc690 -> afee887af
Fixing the log replay for the job update history pruner.
Bugs closed: AURORA-912
Reviewed at https://reviews.apache.org/r/27598/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/afee887a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/afee887a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/afee887a
Branch: refs/heads/master
Commit: afee887af5de085c9a422cb0d63e8a382673b8c5
Parents: 2d68bc6
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Nov 7 14:05:11 2014 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Nov 7 14:05:11 2014 -0800
----------------------------------------------------------------------
.../scheduler/storage/log/LogStorage.java | 301 +++++++++++--------
.../scheduler/storage/log/LogStorageTest.java | 159 +++++++---
2 files changed, 298 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/afee887a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index cbab759..0195557 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -19,6 +19,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Date;
+import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@@ -29,9 +30,9 @@ import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+
import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
import com.twitter.common.base.Closure;
import com.twitter.common.inject.TimedInterceptor.Timed;
import com.twitter.common.quantity.Amount;
@@ -221,10 +222,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
@Qualifier
public @interface WriteBehind { }
- // TODO(wfarner): This is a temporary emergency fix. Revisit.
- @CmdLine(name = "log_storage_fail_on_unknown_op",
- help = "If true, fail if an unknown log transition operation is encountered.")
- public static final Arg<Boolean> FAIL_ON_UNKNOWN_OP = Arg.create(false);
+ private final Map<LogEntry._Fields, Closure<LogEntry>> logEntryReplayActions;
+ private final Map<Op._Fields, Closure<Op>> transactionReplayActions;
@Inject
LogStorage(LogManager logManager,
@@ -306,6 +305,164 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
quotaStore,
attributeStore,
jobUpdateStore);
+
+ this.logEntryReplayActions = buildLogEntryReplayActions();
+ this.transactionReplayActions = buildTransactionReplayActions();
+ }
+
+ @VisibleForTesting
+ final Map<LogEntry._Fields, Closure<LogEntry>> buildLogEntryReplayActions() {
+
+ return ImmutableMap.<LogEntry._Fields, Closure<LogEntry>>builder()
+ .put(LogEntry._Fields.SNAPSHOT, new Closure<LogEntry>() {
+ @Override
+ public void execute(LogEntry logEntry) throws RuntimeException {
+ Snapshot snapshot = logEntry.getSnapshot();
+ LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
+ snapshotStore.applySnapshot(snapshot);
+ }
+ })
+ .put(LogEntry._Fields.TRANSACTION, new Closure<LogEntry>() {
+ @Override
+ public void execute(final LogEntry logEntry) throws RuntimeException {
+ write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider unused) {
+ for (Op op : logEntry.getTransaction().getOps()) {
+ replayOp(op);
+ }
+ }
+ });
+ }
+ })
+ .put(LogEntry._Fields.NOOP, new Closure<LogEntry>() {
+ @Override
+ public void execute(LogEntry item) throws RuntimeException {
+ // Nothing to do here
+ }
+ }).build();
+ }
+
+ @VisibleForTesting
+ final Map<Op._Fields, Closure<Op>> buildTransactionReplayActions() {
+
+ return ImmutableMap.<Op._Fields, Closure<Op>>builder()
+ .put(Op._Fields.SAVE_FRAMEWORK_ID, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId());
+ }
+ })
+ .put(Op._Fields.SAVE_ACCEPTED_JOB, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
+ writeBehindJobStore.saveAcceptedJob(
+ acceptedJob.getManagerId(),
+ IJobConfiguration.build(acceptedJob.getJobConfig()));
+ }
+ })
+ .put(Op._Fields.REMOVE_JOB, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
+ }
+ })
+ .put(Op._Fields.SAVE_TASKS, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindTaskStore.saveTasks(
+ IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
+ }
+ })
+ .put(Op._Fields.REWRITE_TASK, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ RewriteTask rewriteTask = op.getRewriteTask();
+ writeBehindTaskStore.unsafeModifyInPlace(
+ rewriteTask.getTaskId(),
+ ITaskConfig.build(rewriteTask.getTask()));
+ }
+ })
+ .put(Op._Fields.REMOVE_TASKS, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds());
+ }
+ })
+ .put(Op._Fields.SAVE_QUOTA, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ SaveQuota saveQuota = op.getSaveQuota();
+ writeBehindQuotaStore.saveQuota(
+ saveQuota.getRole(),
+ IResourceAggregate.build(saveQuota.getQuota()));
+ }
+ })
+ .put(Op._Fields.REMOVE_QUOTA, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole());
+ }
+ })
+ .put(Op._Fields.SAVE_HOST_ATTRIBUTES, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
+ // Prior to commit 5cf760b, the store would persist maintenance mode changes for
+ // unknown hosts. 5cf760b began rejecting these, but the replicated log may still
+ // contain entries with a null slave ID.
+ if (attributes.isSetSlaveId()) {
+ writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
+ } else {
+ LOG.info("Dropping host attributes with no slave ID: " + attributes);
+ }
+ }
+ })
+ .put(Op._Fields.SAVE_LOCK, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()));
+ }
+ })
+ .put(Op._Fields.REMOVE_LOCK, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
+ }
+ })
+ .put(Op._Fields.SAVE_JOB_UPDATE, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindJobUpdateStore.saveJobUpdate(
+ IJobUpdate.build(op.getSaveJobUpdate().getJobUpdate()),
+ Optional.fromNullable(op.getSaveJobUpdate().getLockToken()));
+ }
+ })
+ .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindJobUpdateStore.saveJobUpdateEvent(
+ IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()),
+ op.getSaveJobUpdateEvent().getUpdateId());
+ }
+ })
+ .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(IJobInstanceUpdateEvent.build(
+ op.getSaveJobInstanceUpdateEvent().getEvent()),
+ op.getSaveJobInstanceUpdateEvent().getUpdateId());
+ }
+ })
+ .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, new Closure<Op>() {
+ @Override
+ public void execute(Op op) throws RuntimeException {
+ writeBehindJobUpdateStore.pruneHistory(
+ op.getPruneJobUpdateHistory().getPerJobRetainCount(),
+ op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs());
+ }
+ }).build();
}
@Override
@@ -367,132 +524,22 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
}
}
- void replay(final LogEntry logEntry) {
- switch (logEntry.getSetField()) {
- case SNAPSHOT:
- Snapshot snapshot = logEntry.getSnapshot();
- LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
- snapshotStore.applySnapshot(snapshot);
- break;
-
- case TRANSACTION:
- write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider unused) {
- for (Op op : logEntry.getTransaction().getOps()) {
- replayOp(op);
- }
- }
- });
- break;
-
- case NOOP:
- // Nothing to do here
- break;
-
- case DEFLATED_ENTRY:
- throw new IllegalArgumentException("Deflated entries are not handled at this layer.");
-
- case FRAME:
- throw new IllegalArgumentException("Framed entries are not handled at this layer.");
-
- case DEDUPLICATED_SNAPSHOT:
- throw new IllegalArgumentException("Deduplicated snapshots are not handled at this layer.");
-
- default:
- throw new IllegalStateException("Unknown log entry type: " + logEntry);
+ private void replay(final LogEntry logEntry) {
+ LogEntry._Fields entryField = logEntry.getSetField();
+ if (!logEntryReplayActions.containsKey(entryField)) {
+ throw new IllegalStateException("Unknown log entry type: " + entryField);
}
+
+ logEntryReplayActions.get(entryField).execute(logEntry);
}
private void replayOp(Op op) {
- switch (op.getSetField()) {
- case SAVE_FRAMEWORK_ID:
- writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId());
- break;
-
- case SAVE_ACCEPTED_JOB:
- SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
- writeBehindJobStore.saveAcceptedJob(
- acceptedJob.getManagerId(),
- IJobConfiguration.build(acceptedJob.getJobConfig()));
- break;
-
- case REMOVE_JOB:
- writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
- break;
-
- case SAVE_TASKS:
- writeBehindTaskStore.saveTasks(
- IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
- break;
-
- case REWRITE_TASK:
- RewriteTask rewriteTask = op.getRewriteTask();
- writeBehindTaskStore.unsafeModifyInPlace(
- rewriteTask.getTaskId(),
- ITaskConfig.build(rewriteTask.getTask()));
- break;
-
- case REMOVE_TASKS:
- writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds());
- break;
-
- case SAVE_QUOTA:
- SaveQuota saveQuota = op.getSaveQuota();
- writeBehindQuotaStore.saveQuota(
- saveQuota.getRole(),
- IResourceAggregate.build(saveQuota.getQuota()));
- break;
-
- case REMOVE_QUOTA:
- writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole());
- break;
-
- case SAVE_HOST_ATTRIBUTES:
- HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
- // Prior to commit 5cf760b, the store would persist maintenance mode changes for
- // unknown hosts. 5cf760b began rejecting these, but the replicated log may still
- // contain entries with a null slave ID.
- if (attributes.isSetSlaveId()) {
- writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
- } else {
- LOG.info("Dropping host attributes with no slave ID: " + attributes);
- }
- break;
-
- case SAVE_LOCK:
- writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()));
- break;
-
- case REMOVE_LOCK:
- writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
- break;
-
- case SAVE_JOB_UPDATE:
- writeBehindJobUpdateStore.saveJobUpdate(
- IJobUpdate.build(op.getSaveJobUpdate().getJobUpdate()),
- Optional.fromNullable(op.getSaveJobUpdate().getLockToken()));
- break;
-
- case SAVE_JOB_UPDATE_EVENT:
- writeBehindJobUpdateStore.saveJobUpdateEvent(
- IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()),
- op.getSaveJobUpdateEvent().getUpdateId());
- break;
-
- case SAVE_JOB_INSTANCE_UPDATE_EVENT:
- writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(IJobInstanceUpdateEvent.build(
- op.getSaveJobInstanceUpdateEvent().getEvent()),
- op.getSaveJobInstanceUpdateEvent().getUpdateId());
- break;
-
- default:
- if (FAIL_ON_UNKNOWN_OP.get()) {
- throw new IllegalStateException("Unknown transaction op: " + op);
- } else {
- LOG.log(Level.SEVERE, "Unkown transaction op: " + op);
- }
+ Op._Fields opField = op.getSetField();
+ if (!transactionReplayActions.containsKey(opField)) {
+ throw new IllegalStateException("Unknown transaction op: " + opField);
}
+
+ transactionReplayActions.get(opField).execute(op);
}
private void scheduleSnapshots() {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/afee887a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 8eb5c3f..55a4784 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -13,6 +13,7 @@
*/
package org.apache.aurora.scheduler.storage.log;
+import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -22,9 +23,11 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.testing.TearDown;
+
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Data;
import com.twitter.common.quantity.Time;
@@ -205,37 +208,6 @@ public class LogStorageTest extends EasyMockTest {
}
});
- Entry entry1 = createMock(Entry.class);
- Entry entry2 = createMock(Entry.class);
- Entry entry3 = createMock(Entry.class);
- Entry entry4 = createMock(Entry.class);
- Entry entry5 = createMock(Entry.class);
- String frameworkId1 = "bob";
- LogEntry recoveredEntry1 =
- createTransaction(Op.saveFrameworkId(new SaveFrameworkId(frameworkId1)));
- String frameworkId2 = "jim";
- LogEntry recoveredEntry2 =
- createTransaction(Op.saveFrameworkId(new SaveFrameworkId(frameworkId2)));
- // This entry lacks a slave ID, and should therefore be discarded.
- LogEntry recoveredEntry3 =
- createTransaction(Op.saveHostAttributes(new SaveHostAttributes(new HostAttributes()
- .setHost("host1")
- .setMode(MaintenanceMode.DRAINED))));
- IHostAttributes attributes = IHostAttributes.build(new HostAttributes()
- .setHost("host2")
- .setSlaveId("slave2")
- .setMode(MaintenanceMode.DRAINED));
- LogEntry recoveredEntry4 =
- createTransaction(Op.saveHostAttributes(new SaveHostAttributes(attributes.newBuilder())));
- LogEntry recoveredEntry5 =
- createTransaction(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory()));
- expect(entry1.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry1));
- expect(entry2.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry2));
- expect(entry3.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry3));
- expect(entry4.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry4));
- expect(entry5.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(recoveredEntry5));
- expect(stream.readAll()).andReturn(Iterators.forArray(entry1, entry2, entry3, entry4, entry5));
-
final Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
new IAnswer<Void>() {
@@ -245,9 +217,6 @@ public class LogStorageTest extends EasyMockTest {
return null;
}
});
- storageUtil.schedulerStore.saveFrameworkId(frameworkId1);
- storageUtil.schedulerStore.saveFrameworkId(frameworkId2);
- storageUtil.attributeStore.saveHostAttributes(attributes);
final Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture();
expect(storageUtil.storage.write(capture(initializationWork))).andAnswer(
@@ -279,7 +248,10 @@ public class LogStorageTest extends EasyMockTest {
snapshotWork.getValue().apply(storageUtil.mutableStoreProvider);
return null;
}
- }).times(5);
+ }).anyTimes();
+
+ // Populate all LogEntry types.
+ buildReplayLogEntries();
control.replay();
@@ -289,6 +261,123 @@ public class LogStorageTest extends EasyMockTest {
// Run the snapshot thread.
snapshotAction.getValue().run();
+
+ // Assert all LogEntry types have handlers defined.
+ // Our current StreamManagerImpl.readFromBeginning() does not let some entries escape
+ // the decoding routine making handling them in replay unnecessary.
+ assertEquals(
+ Sets.complementOf(EnumSet.of(
+ LogEntry._Fields.FRAME,
+ LogEntry._Fields.DEDUPLICATED_SNAPSHOT,
+ LogEntry._Fields.DEFLATED_ENTRY)),
+ EnumSet.copyOf(logStorage.buildLogEntryReplayActions().keySet()));
+
+ // Assert all Transaction types have handlers defined.
+ assertEquals(
+ EnumSet.allOf(Op._Fields.class),
+ EnumSet.copyOf(logStorage.buildTransactionReplayActions().keySet()));
+ }
+
+ private void buildReplayLogEntries() throws Exception {
+ ImmutableSet.Builder<LogEntry> builder = ImmutableSet.builder();
+
+ builder.add(createTransaction(Op.saveFrameworkId(new SaveFrameworkId("bob"))));
+ storageUtil.schedulerStore.saveFrameworkId("bob");
+
+ SaveAcceptedJob acceptedJob =
+ new SaveAcceptedJob().setManagerId("CRON").setJobConfig(new JobConfiguration());
+ builder.add(createTransaction(Op.saveAcceptedJob(acceptedJob)));
+ storageUtil.jobStore.saveAcceptedJob(
+ acceptedJob.getManagerId(),
+ IJobConfiguration.build(acceptedJob.getJobConfig()));
+
+ RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
+ builder.add(createTransaction(Op.removeJob(removeJob)));
+ storageUtil.jobStore.removeJob(JOB_KEY);
+
+ SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(new ScheduledTask()));
+ builder.add(createTransaction(Op.saveTasks(saveTasks)));
+ storageUtil.taskStore.saveTasks(IScheduledTask.setFromBuilders(saveTasks.getTasks()));
+
+ RewriteTask rewriteTask = new RewriteTask("id1", new TaskConfig());
+ builder.add(createTransaction(Op.rewriteTask(rewriteTask)));
+ expect(storageUtil.taskStore.unsafeModifyInPlace(
+ rewriteTask.getTaskId(),
+ ITaskConfig.build(rewriteTask.getTask()))).andReturn(true);
+
+ RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.<String>of("taskId1"));
+ builder.add(createTransaction(Op.removeTasks(removeTasks)));
+ storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
+
+ SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), new ResourceAggregate());
+ builder.add(createTransaction(Op.saveQuota(saveQuota)));
+ storageUtil.quotaStore.saveQuota(
+ saveQuota.getRole(),
+ IResourceAggregate.build(saveQuota.getQuota()));
+
+ builder.add(createTransaction(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole()))));
+ storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
+
+ // This entry lacks a slave ID, and should therefore be discarded.
+ SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
+ .setHost("host1")
+ .setMode(MaintenanceMode.DRAINED));
+ builder.add(createTransaction(Op.saveHostAttributes(hostAttributes1)));
+
+ SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
+ .setHost("host2")
+ .setSlaveId("slave2")
+ .setMode(MaintenanceMode.DRAINED));
+ builder.add(createTransaction(Op.saveHostAttributes(hostAttributes2)));
+ storageUtil.attributeStore.saveHostAttributes(
+ IHostAttributes.build(hostAttributes2.getHostAttributes()));
+
+ SaveLock saveLock = new SaveLock(new Lock().setKey(LockKey.job(JOB_KEY.newBuilder())));
+ builder.add(createTransaction(Op.saveLock(saveLock)));
+ storageUtil.lockStore.saveLock(ILock.build(saveLock.getLock()));
+
+ RemoveLock removeLock = new RemoveLock(LockKey.job(JOB_KEY.newBuilder()));
+ builder.add(createTransaction(Op.removeLock(removeLock)));
+ storageUtil.lockStore.removeLock(ILockKey.build(removeLock.getLockKey()));
+
+ SaveJobUpdate saveUpdate = new SaveJobUpdate(new JobUpdate(), "token");
+ builder.add(createTransaction(Op.saveJobUpdate(saveUpdate)));
+ storageUtil.jobUpdateStore.saveJobUpdate(
+ IJobUpdate.build(saveUpdate.getJobUpdate()),
+ Optional.of(saveUpdate.getLockToken()));
+
+ SaveJobUpdateEvent saveUpdateEvent = new SaveJobUpdateEvent(new JobUpdateEvent(), "update");
+ builder.add(createTransaction(Op.saveJobUpdateEvent(saveUpdateEvent)));
+ storageUtil.jobUpdateStore.saveJobUpdateEvent(
+ IJobUpdateEvent.build(saveUpdateEvent.getEvent()),
+ saveUpdateEvent.getUpdateId());
+
+ SaveJobInstanceUpdateEvent saveInstanceEvent =
+ new SaveJobInstanceUpdateEvent(new JobInstanceUpdateEvent(), "update");
+ builder.add(createTransaction(Op.saveJobInstanceUpdateEvent(saveInstanceEvent)));
+ storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+ IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()),
+ saveInstanceEvent.getUpdateId());
+
+ builder.add(createTransaction(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L))));
+ expect(storageUtil.jobUpdateStore.pruneHistory(5, 10L)).andReturn(ImmutableSet.of("id2"));
+
+ // NOOP LogEntry
+ builder.add(LogEntry.noop(true));
+
+ // Snapshot LogEntry
+ Snapshot snapshot = new Snapshot();
+ builder.add(LogEntry.snapshot(snapshot));
+ snapshotStore.applySnapshot(snapshot);
+
+ ImmutableSet.Builder<Entry> entryBuilder = ImmutableSet.builder();
+ for (LogEntry logEntry : builder.build()) {
+ Entry entry = createMock(Entry.class);
+ entryBuilder.add(entry);
+ expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry));
+ }
+
+ expect(stream.readAll()).andReturn(entryBuilder.build().iterator());
}
abstract class StorageTestFixture {