You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:23 UTC

[30/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
new file mode 100644
index 0000000..fdc9ae7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+
+/**
+ * Manages scheduler storage operations providing an interface to perform atomic changes.
+ */
+public interface Storage {
+
+  interface StoreProvider {
+    SchedulerStore getSchedulerStore();
+    JobStore getJobStore();
+    TaskStore getTaskStore();
+    LockStore getLockStore();
+    QuotaStore getQuotaStore();
+    AttributeStore getAttributeStore();
+  }
+
+  interface MutableStoreProvider extends StoreProvider {
+    SchedulerStore.Mutable getSchedulerStore();
+    JobStore.Mutable getJobStore();
+
+    /**
+     * Gets access to the mutable task store.
+     * <p>
+     * This is labeled as unsafe, since it's rare that a caller should be using this.  In most
+     * cases, mutations to the task store should be done through
+     * {@link com.twitter.aurora.scheduler.state.StateManager}.
+     * <p>
+     * TODO(William Farner): Come up with a way to restrict access to this interface.  As it stands,
+     * it's trivial for an unsuspecting caller to modify the task store directly and subvert the
+     * state machine and side effect systems.
+     *
+     * @return The mutable task store.
+     */
+    TaskStore.Mutable getUnsafeTaskStore();
+
+    LockStore.Mutable getLockStore();
+    QuotaStore.Mutable getQuotaStore();
+    AttributeStore.Mutable getAttributeStore();
+  }
+
+  /**
+   * Encapsulates a read-only storage operation.
+   *
+   * @param <T> The type of result this unit of work produces.
+   * @param <E> The type of exception this unit of work can throw.
+   */
+  interface Work<T, E extends Exception> {
+
+    /**
+     * Abstracts a unit of work that has a result, but may also throw a specific exception.
+     *
+     * @param storeProvider A provider to give access to different available stores.
+     * @return the result of the successfully completed unit of work
+     * @throws E if the unit of work could not be completed
+     */
+    T apply(StoreProvider storeProvider) throws E;
+
+    /**
+     * A convenient typedef for Work that throws no checked exceptions - it runs quietly.
+     *
+     * @param <T> The type of result this unit of work produces.
+     */
+    interface Quiet<T> extends Work<T, RuntimeException> {
+      // typedef
+    }
+  }
+
+  /**
+   * Encapsulates a storage operation, which has mutable storage access.
+   *
+   * @param <T> The type of result this unit of work produces.
+   * @param <E> The type of exception this unit of work can throw.
+   */
+  interface MutateWork<T, E extends Exception> {
+
+    NoResult.Quiet NOOP = new NoResult.Quiet() {
+      @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
+        // No-op.
+      }
+    };
+
+    /**
+     * Abstracts a unit of work that should either commit a set of changes to storage as a side
+     * effect of successful completion or else commit no changes at all when an exception is thrown.
+     *
+     * @param storeProvider A provider to give access to different available stores.
+     * @return the result of the successfully completed unit of work
+     * @throws E if the unit of work could not be completed
+     */
+    T apply(MutableStoreProvider storeProvider) throws E;
+
+    /**
+     * A convenient typedef for Work that throws no checked exceptions - it runs quietly.
+     *
+     * @param <T> The type of result this unit of work produces.
+     */
+    interface Quiet<T> extends MutateWork<T, RuntimeException> {
+      // typedef
+    }
+
+    /**
+     * Encapsulates work that returns no result.
+     *
+     * @param <E> The type of exception this unit of work can throw.
+     */
+    abstract class NoResult<E extends Exception> implements MutateWork<Void, E> {
+
+      @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
+        execute(storeProvider);
+        return null;
+      }
+
+      /**
+       * Similar to {@link #apply(MutableStoreProvider)} except that no result is
+       * returned.
+       *
+       * @param storeProvider A provider to give access to different available stores.
+       * @throws E if the unit of work could not be completed
+       */
+      protected abstract void execute(MutableStoreProvider storeProvider) throws E;
+
+      /**
+       * A convenient typedef for Work with no result that throws no checked exceptions - it runs
+       * quitely.
+       */
+      public abstract static class Quiet extends NoResult<RuntimeException> {
+        // typedef
+      }
+    }
+  }
+
+  /**
+   * Indicates a problem reading from or writing to stable storage.
+   */
+  class StorageException extends SchedulerException {
+    public StorageException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public StorageException(String message) {
+      super(message);
+    }
+  }
+
+  /**
+   * Executes the unit of read-only {@code work}.  All data in the stores may be expected to be
+   * consistent, as the invocation is mutually exclusive of any writes.
+   *
+   * @param work The unit of work to execute.
+   * @param <T> The type of result this unit of work produces.
+   * @param <E> The type of exception this unit of work can throw.
+   * @return the result when the unit of work completes successfully
+   * @throws StorageException if there was a problem reading from stable storage.
+   * @throws E bubbled transparently when the unit of work throws
+   */
+  <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E;
+
+  /**
+   * Executes a unit of read-only {@code work}.  This is functionally identical to
+   * {@link #consistentRead(Work)} with the exception that data in the stores may not be fully
+   * consistent.
+   *
+   * @param work The unit of work to execute.
+   * @param <T> The type of result this unit of work produces.
+   * @param <E> The type of exception this unit of work can throw.
+   * @return the result when the unit of work completes successfully
+   * @throws StorageException if there was a problem reading from stable storage.
+   * @throws E bubbled transparently when the unit of work throws
+   */
+  <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work) throws StorageException, E;
+
+  /**
+   * Executes the unit of mutating {@code work}.
+   *
+   * @param work The unit of work to execute.
+   * @param <T> The type of result this unit of work produces.
+   * @param <E> The type of exception this unit of work can throw.
+   * @return the result when the unit of work completes successfully
+   * @throws StorageException if there was a problem reading from or writing to stable storage.
+   * @throws E bubbled transparently when the unit of work throws
+   */
+  <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E;
+
+  /**
+   * Clean up the underlying storage by optimizing internal data structures. Does not change
+   * externally-visible state but might not run concurrently with write operations.
+   */
+  void snapshot() throws StorageException;
+
+  /**
+   * A non-volatile storage that has additional methods to control its lifecycle.
+   */
+  interface NonVolatileStorage extends Storage {
+    /**
+     * Requests the underlying storage prepare its data set; ie: initialize schemas, begin syncing
+     * out of date data, etc.  This method should not block.
+     *
+     * @throws StorageException if there was a problem preparing storage.
+     */
+    void prepare() throws StorageException;
+
+    /**
+     * Prepares the underlying storage for serving traffic.
+     *
+     * @param initializationLogic work to perform after this storage system is ready but before
+     *     allowing general use of
+     *     {@link #consistentRead}.
+     * @throws StorageException if there was a starting storage.
+     */
+    void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
+
+    /**
+     * Prepares the underlying storage system for clean shutdown.
+     */
+    void stop();
+  }
+
+  /**
+   * Identifies a storage layer that is in-memory only.
+   * This generally should only be used when the storage is first starting up, to perform queries
+   * related to initially load the storage.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @BindingAnnotation
+  public @interface Volatile { }
+
+  /**
+   * Utility functions for interacting with a Storage instance.
+   */
+  public final class Util {
+
+    private Util() {
+      // Utility class.
+    }
+
+    /**
+     * Fetch tasks matching the query returned by {@code query} from {@code storage} in a
+     * read operation.
+     *
+     * @see #consistentFetchTasks
+     * @param storage Storage instance to query from.
+     * @param query Builder of the query to perform.
+     * @return Tasks returned from the query.
+     */
+    public static ImmutableSet<IScheduledTask> consistentFetchTasks(
+        Storage storage,
+        final Query.Builder query) {
+
+      return storage.consistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
+        @Override public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
+          return storeProvider.getTaskStore().fetchTasks(query);
+        }
+      });
+    }
+
+    /**
+     * Identical to {@link #consistentFetchTasks(Storage, Query.Builder)}, but fetches tasks using a
+     * weakly-consistent read operation.
+     *
+     * @see #consistentFetchTasks
+     * @param storage Storage instance to query from.
+     * @param query Builder of the query to perform.
+     * @return Tasks returned from the query.
+     */
+    public static ImmutableSet<IScheduledTask> weaklyConsistentFetchTasks(
+        Storage storage,
+        final Query.Builder query) {
+
+      return storage.weaklyConsistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
+        @Override public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
+          return storeProvider.getTaskStore().fetchTasks(query);
+        }
+      });
+    }
+
+    /**
+     * Fetch quota for {@code role} from {@code storage} in a consistent read operation.
+     *
+     * @param storage Storage instance to fetch quota from.
+     * @param role Role to fetch quota for.
+     * @return Quota returned from the fetch operation.
+     * @see QuotaStore#fetchQuota(String)
+     */
+    public static Optional<IQuota> consistentFetchQuota(Storage storage, final String role) {
+      return storage.consistentRead(new Work.Quiet<Optional<IQuota>>() {
+        @Override public Optional<IQuota> apply(StoreProvider storeProvider) {
+          return storeProvider.getQuotaStore().fetchQuota(role);
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
new file mode 100644
index 0000000..df5b603
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import com.twitter.aurora.gen.JobConfiguration;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.gen.TaskConfig;
+import com.twitter.aurora.gen.TaskEvent;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+/**
+ * Utility class to contain and perform storage backfill operations.
+ */
+public final class StorageBackfill {
+
+  private static final Logger LOG = Logger.getLogger(StorageBackfill.class.getName());
+
+  private static final AtomicLong SHARD_SANITY_CHECK_FAILS =
+      Stats.exportLong("shard_sanity_check_failures");
+
+  private StorageBackfill() {
+    // Utility class.
+  }
+
+  private static void backfillJobDefaults(JobStore.Mutable jobStore) {
+    for (String id : jobStore.fetchManagerIds()) {
+      for (JobConfiguration job : IJobConfiguration.toBuildersList(jobStore.fetchJobs(id))) {
+        ConfigurationManager.applyDefaultsIfUnset(job);
+        jobStore.saveAcceptedJob(id, IJobConfiguration.build(job));
+      }
+    }
+  }
+
+  private static void guaranteeShardUniqueness(
+      ScheduledTask task,
+      TaskStore.Mutable taskStore,
+      Clock clock) {
+
+    if (Tasks.isActive(task.getStatus())) {
+      // Perform a sanity check on the number of active shards.
+      TaskConfig config = task.getAssignedTask().getTask();
+      Query.Builder query = Query.instanceScoped(
+          JobKeys.from(config.getOwner().getRole(), config.getEnvironment(), config.getJobName()),
+          task.getAssignedTask().getInstanceId())
+          .active();
+      Set<String> activeTasksInShard = FluentIterable.from(taskStore.fetchTasks(query))
+          .transform(Tasks.SCHEDULED_TO_ID)
+          .toSet();
+
+      if (activeTasksInShard.size() > 1) {
+        SHARD_SANITY_CHECK_FAILS.incrementAndGet();
+        LOG.severe("Active shard sanity check failed when loading " + Tasks.id(task)
+            + ", active tasks found: " + activeTasksInShard);
+
+        // We want to keep exactly one task from this shard, so sort the IDs and keep the
+        // highest (newest) in the hopes that it is legitimately running.
+        String newestTask = Iterables.getLast(Sets.newTreeSet(activeTasksInShard));
+        if (!Tasks.id(task).equals(newestTask)) {
+          task.setStatus(ScheduleStatus.KILLED);
+          task.addToTaskEvents(new TaskEvent(clock.nowMillis(), ScheduleStatus.KILLED)
+              .setMessage("Killed duplicate shard."));
+          // TODO(wfarner); Circle back if this is necessary.  Currently there's a race
+          // condition between the time the scheduler is actually available without hitting
+          // IllegalStateException (see DriverImpl).
+          // driver.killTask(Tasks.id(task));
+        } else {
+          LOG.info("Retaining task " + Tasks.id(task));
+        }
+      }
+    }
+  }
+
+  private static final AtomicLong BOTH_FIELDS_SET = Stats.exportLong("both_instance_ids_set");
+  private static final AtomicLong OLD_FIELD_SET = Stats.exportLong("old_instance_id_set");
+  private static final AtomicLong NEW_FIELD_SET = Stats.exportLong("new_instance_id_set");
+  private static final AtomicLong FIELDS_INCONSISTENT =
+      Stats.exportLong("instance_ids_inconsistent");
+
+  /**
+   * Ensures backwards-compatibility of the throttled state, which exists in this version but is
+   * not handled.
+   *
+   * @param task Task to possibly rewrite.
+   */
+  private static void rewriteThrottledState(ScheduledTask task) {
+    if (ScheduleStatus.THROTTLED == task.getStatus()) {
+      task.setStatus(ScheduleStatus.PENDING);
+    }
+  }
+
+  /**
+   * Backfills the storage to make it match any assumptions that may have changed since
+   * the structs were first written.
+   *
+   * @param storeProvider Storage provider.
+   * @param clock Clock, used for timestamping backfilled task events.
+   */
+  public static void backfill(final MutableStoreProvider storeProvider, final Clock clock) {
+    backfillJobDefaults(storeProvider.getJobStore());
+
+    LOG.info("Performing shard uniqueness sanity check.");
+    storeProvider.getUnsafeTaskStore().mutateTasks(Query.unscoped(), new TaskMutation() {
+      @Override public IScheduledTask apply(final IScheduledTask task) {
+        ScheduledTask builder = task.newBuilder();
+        ConfigurationManager.applyDefaultsIfUnset(builder.getAssignedTask().getTask());
+        // TODO(ksweeney): Guarantee tasks pass current validation code here and quarantine if they
+        // don't.
+        guaranteeShardUniqueness(builder, storeProvider.getUnsafeTaskStore(), clock);
+        rewriteThrottledState(builder);
+        return IScheduledTask.build(builder);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
new file mode 100644
index 0000000..02e5096
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Stores all tasks configured with the scheduler.
+ */
+public interface TaskStore {
+
+  /**
+   * Fetches a read-only view of tasks matching a query and filters. Intended for use with a
+   * {@link com.twitter.aurora.scheduler.base.Query.Builder}.
+   *
+   * @param query Builder of the query to identify tasks with.
+   * @return A read-only view of matching tasks.
+   */
+  ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query);
+
+  public interface Mutable extends TaskStore {
+
+    /**
+     * A convenience interface to allow callers to more concisely implement a task mutation.
+     */
+    public interface TaskMutation extends Function<IScheduledTask, IScheduledTask> {
+    }
+
+    /**
+     * Saves tasks to the store.  Tasks are copied internally, meaning that the tasks are stored in
+     * the state they were in when the method is called, and further object modifications will not
+     * affect the tasks.  If any of the tasks already exist in the store, they will be overwritten
+     * by the supplied {@code newTasks}.
+     *
+     * @param tasks Tasks to add.
+     */
+    void saveTasks(Set<IScheduledTask> tasks);
+
+    /**
+     * Removes all tasks from the store.
+     */
+    void deleteAllTasks();
+
+    /**
+     * Deletes specific tasks from the store.
+     *
+     * @param taskIds IDs of tasks to delete.
+     */
+    void deleteTasks(Set<String> taskIds);
+
+    /**
+     * Offers temporary mutable access to tasks.  If a task ID is not found, it will be silently
+     * skipped, and no corresponding task will be returned.
+     *
+     * @param query Query to match tasks against.
+     * @param mutator The mutate operation.
+     * @return Immutable copies of only the tasks that were mutated.
+     */
+    ImmutableSet<IScheduledTask> mutateTasks(
+        Query.Builder query,
+        Function<IScheduledTask, IScheduledTask> mutator);
+
+    /**
+     * Rewrites a task's configuration in-place.
+     * <p>
+     * <b>WARNING</b>: this is a dangerous operation, and should not be used without exercising
+     * great care.  This feature should be used as a last-ditch effort to rewrite things that
+     * the scheduler otherwise can't (e.g. {@link ITaskConfig#executorConfig}) rewrite in a
+     * controlled/tested backfill operation.
+     *
+     * @param taskId ID of the task to alter.
+     * @param taskConfiguration Configuration object to swap with the existing task's
+     *                          configuration.
+     * @return {@code true} if the modification took effect, or {@code false} if the task does not
+     *         exist in the store.
+     */
+    boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
new file mode 100644
index 0000000..b6beba3
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.backup;
+
+import java.io.File;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.scheduler.storage.SnapshotStore;
+import com.twitter.aurora.scheduler.storage.backup.Recovery.RecoveryImpl;
+import com.twitter.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl;
+import com.twitter.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl.BackupConfig;
+import com.twitter.aurora.scheduler.storage.backup.TemporaryStorage.TemporaryStorageFactory;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotNull;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A module that will periodically save full storage backups to local disk and makes those backups
+ * available for on-line recovery.
+ */
+public class BackupModule extends PrivateModule {
+  private static final Logger LOG = Logger.getLogger(BackupModule.class.getName());
+
+  @CmdLine(name = "backup_interval", help = "Minimum interval on which to write a storage backup.")
+  private static final Arg<Amount<Long, Time>> BACKUP_INTERVAL =
+      Arg.create(Amount.of(1L, Time.HOURS));
+
+  @CmdLine(name = "max_saved_backups",
+      help = "Maximum number of backups to retain before deleting the oldest backups.")
+  private static final Arg<Integer> MAX_SAVED_BACKUPS = Arg.create(48);
+
+  @NotNull
+  @CmdLine(name = "backup_dir",
+      help = "Directory to store backups under. Will be created if it does not exist.")
+  private static final Arg<File> BACKUP_DIR = Arg.create();
+
+  private final Class<? extends SnapshotStore<Snapshot>> snapshotStore;
+  private final File unvalidatedBackupDir;
+
+  /**
+   * Creates a new backup module.
+   *
+   * @param snapshotStore Snapshot store implementation class.
+   */
+  public BackupModule(Class<? extends SnapshotStore<Snapshot>> snapshotStore) {
+    this(BACKUP_DIR.get(), snapshotStore);
+  }
+
+  /**
+   * Creates a new backup module using a given backupDir instead of a flagged one.
+   *
+   * @param backupDir Directory to write backups to.
+   * @param snapshotStore Snapshot store implementation class.
+   */
+  @VisibleForTesting
+  public BackupModule(File backupDir, Class<? extends SnapshotStore<Snapshot>> snapshotStore) {
+    this.unvalidatedBackupDir = checkNotNull(backupDir);
+    this.snapshotStore = checkNotNull(snapshotStore);
+  }
+
+  @Override
+  protected void configure() {
+    TypeLiteral<SnapshotStore<Snapshot>> type = new TypeLiteral<SnapshotStore<Snapshot>>() { };
+    bind(type).annotatedWith(StorageBackupImpl.SnapshotDelegate.class).to(snapshotStore);
+
+    bind(type).to(StorageBackupImpl.class);
+    bind(StorageBackup.class).to(StorageBackupImpl.class);
+    bind(StorageBackupImpl.class).in(Singleton.class);
+    expose(type);
+    expose(StorageBackup.class);
+
+    bind(new TypeLiteral<Function<Snapshot, TemporaryStorage>>() { })
+        .to(TemporaryStorageFactory.class);
+
+    bind(Command.class).to(LifecycleHook.class);
+    bind(Recovery.class).to(RecoveryImpl.class);
+    bind(RecoveryImpl.class).in(Singleton.class);
+    expose(Recovery.class);
+  }
+
+  static class LifecycleHook implements Command {
+    private final Lifecycle lifecycle;
+
+    @Inject LifecycleHook(Lifecycle lifecycle) {
+      this.lifecycle = checkNotNull(lifecycle);
+    }
+
+    @Override public void execute() {
+      lifecycle.shutdown();
+    }
+  }
+
+  @Provides
+  private File provideBackupDir() {
+    if (!unvalidatedBackupDir.exists()) {
+      if (!unvalidatedBackupDir.mkdirs()) {
+        throw new IllegalArgumentException(
+            "Unable to create backup dir " + unvalidatedBackupDir.getPath() + ".");
+      } else {
+        LOG.info("Created backup dir " + unvalidatedBackupDir.getPath() + ".");
+      }
+    }
+
+    if (!unvalidatedBackupDir.canWrite()) {
+      throw new IllegalArgumentException(
+          "Backup dir " + unvalidatedBackupDir.getPath() + " is not writable.");
+    }
+
+    return unvalidatedBackupDir;
+  }
+
+  @Provides
+  private BackupConfig provideBackupConfig(File backupDir) {
+    return new BackupConfig(backupDir, MAX_SAVED_BACKUPS.get(), BACKUP_INTERVAL.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
new file mode 100644
index 0000000..4e91342
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Atomics;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.DistributedSnapshotStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.base.Command;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A recovery mechanism that works with {@link StorageBackup} to provide a two-step storage
+ * recovery process.
+ */
+public interface Recovery {
+
+  /**
+   * List backups available for recovery.
+   *
+   * @return Available backup IDs.
+   */
+  Set<String> listBackups();
+
+  /**
+   * Loads a backup in 'staging' so that it may be queried and modified prior to committing.
+   *
+   * @param backupName Name of the backup to load.
+   * @throws RecoveryException If the backup could not be found or loaded.
+   */
+  void stage(String backupName) throws RecoveryException;
+
+  /**
+   * Queries a staged backup.
+   *
+   * @param query Builder of query to perform.
+   * @return Tasks matching the query.
+   * @throws RecoveryException If a backup is not staged, or could not be queried.
+   */
+  Set<IScheduledTask> query(Query.Builder query) throws RecoveryException;
+
+  /**
+   * Deletes tasks from a staged backup.
+   *
+   * @param query Query builder for tasks to delete.
+   * @throws RecoveryException If a backup is not staged, or tasks could not be deleted.
+   */
+  void deleteTasks(Query.Builder query) throws RecoveryException;
+
+  /**
+   * Unloads a staged backup.
+   */
+  void unload();
+
+  /**
+   * Commits a staged backup the main storage system.
+   *
+   * @throws RecoveryException If a backup is not staged, or the commit failed.
+   */
+  void commit() throws RecoveryException;
+
+  /**
+   * Thrown when a recovery operation could not be completed due to internal errors or improper
+   * invocation order.
+   */
+  public static class RecoveryException extends Exception {
+    RecoveryException(String message) {
+      super(message);
+    }
+
+    RecoveryException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  class RecoveryImpl implements Recovery {
+    private final File backupDir;
+    private final Function<Snapshot, TemporaryStorage> tempStorageFactory;
+    private final AtomicReference<PendingRecovery> recovery;
+    private final Storage primaryStorage;
+    private final DistributedSnapshotStore distributedStore;
+    private final Command shutDownNow;
+
+    @Inject
+    RecoveryImpl(
+        File backupDir,
+        Function<Snapshot, TemporaryStorage> tempStorageFactory,
+        Storage primaryStorage,
+        DistributedSnapshotStore distributedStore,
+        Command shutDownNow) {
+
+      this.backupDir = checkNotNull(backupDir);
+      this.tempStorageFactory = checkNotNull(tempStorageFactory);
+      this.recovery = Atomics.newReference();
+      this.primaryStorage = checkNotNull(primaryStorage);
+      this.distributedStore = checkNotNull(distributedStore);
+      this.shutDownNow = checkNotNull(shutDownNow);
+    }
+
+    @Override public Set<String> listBackups() {
+      return ImmutableSet.<String>builder().add(backupDir.list()).build();
+    }
+
+    @Override public void stage(String backupName) throws RecoveryException {
+      File backupFile = new File(backupDir, backupName);
+      if (!backupFile.exists()) {
+        throw new RecoveryException("Backup " + backupName + " does not exist.");
+      }
+
+      Snapshot snapshot;
+      try {
+        snapshot = ThriftBinaryCodec.decode(Snapshot.class, Files.toByteArray(backupFile));
+      } catch (CodingException e) {
+        throw new RecoveryException("Failed to decode backup " + e, e);
+      } catch (IOException e) {
+        throw new RecoveryException("Failed to read backup " + e, e);
+      }
+      boolean applied =
+          recovery.compareAndSet(null, new PendingRecovery(tempStorageFactory.apply(snapshot)));
+      if (!applied) {
+        throw new RecoveryException("Another backup is already loaded.");
+      }
+    }
+
+    private PendingRecovery getLoadedRecovery() throws RecoveryException {
+      @Nullable PendingRecovery loaded = this.recovery.get();
+      if (loaded == null) {
+        throw new RecoveryException("No backup loaded.");
+      }
+      return loaded;
+    }
+
+    @Override public Set<IScheduledTask> query(Query.Builder query) throws RecoveryException {
+      return getLoadedRecovery().query(query);
+    }
+
+    @Override public void deleteTasks(Query.Builder query) throws RecoveryException {
+      getLoadedRecovery().delete(query);
+    }
+
+    @Override public void unload() {
+      recovery.set(null);
+    }
+
+    @Override public void commit() throws RecoveryException {
+      getLoadedRecovery().commit();
+    }
+
+    private class PendingRecovery {
+      private final TemporaryStorage tempStorage;
+
+      PendingRecovery(TemporaryStorage tempStorage) {
+        this.tempStorage = tempStorage;
+      }
+
+      void commit() {
+        primaryStorage.write(new MutateWork.NoResult.Quiet() {
+          @Override protected void execute(MutableStoreProvider storeProvider) {
+            try {
+              distributedStore.persist(tempStorage.toSnapshot());
+              shutDownNow.execute();
+            } catch (CodingException e) {
+              throw new IllegalStateException("Failed to encode snapshot.", e);
+            }
+          }
+        });
+      }
+
+      Set<IScheduledTask> query(final Query.Builder query) {
+        return tempStorage.fetchTasks(query);
+      }
+
+      void delete(final Query.Builder query) {
+        tempStorage.deleteTasks(query);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
new file mode 100644
index 0000000..3faeb1f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.backup;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+import com.google.common.io.Files;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.scheduler.storage.SnapshotStore;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A backup routine that layers over a snapshot store and periodically writes snapshots to
+ * local disk.
+ *
+ * TODO(William Farner): Perform backups asynchronously.  As written, they are performed in a
+ * blocking write operation, which is asking for trouble.
+ */
+public interface StorageBackup {
+
+  /**
+   * Perform a storage backup immediately, blocking until it is complete.
+   */
+  void backupNow();
+
+  class StorageBackupImpl implements StorageBackup, SnapshotStore<Snapshot> {
+    private static final Logger LOG = Logger.getLogger(StorageBackup.class.getName());
+
+    private static final String FILE_PREFIX = "scheduler-backup-";
+    private final BackupConfig config;
+
+    static class BackupConfig {
+      private final File dir;
+      private final int maxBackups;
+      private final Amount<Long, Time> interval;
+
+      BackupConfig(File dir, int maxBackups, Amount<Long, Time> interval) {
+        this.dir = checkNotNull(dir);
+        this.maxBackups = maxBackups;
+        this.interval = checkNotNull(interval);
+      }
+
+      @VisibleForTesting
+      File getDir() {
+        return dir;
+      }
+    }
+
+    /**
+     * Binding annotation that the underlying {@link SnapshotStore} must be bound with.
+     */
+    @BindingAnnotation
+    @Target({FIELD, PARAMETER, METHOD}) @Retention(RUNTIME)
+    @interface SnapshotDelegate { }
+
+    private final SnapshotStore<Snapshot> delegate;
+    private final Clock clock;
+    private final long backupIntervalMs;
+    private volatile long lastBackupMs;
+    private final DateFormat backupDateFormat;
+
+    private final AtomicLong successes = Stats.exportLong("scheduler_backup_success");
+    @VisibleForTesting
+    AtomicLong getSuccesses() {
+      return successes;
+    }
+
+    private final AtomicLong failures = Stats.exportLong("scheduler_backup_failed");
+    @VisibleForTesting
+    AtomicLong getFailures() {
+      return failures;
+    }
+
+    @Inject
+    StorageBackupImpl(
+        @SnapshotDelegate SnapshotStore<Snapshot> delegate,
+        Clock clock,
+        BackupConfig config) {
+
+      this.delegate = checkNotNull(delegate);
+      this.clock = checkNotNull(clock);
+      this.config = checkNotNull(config);
+      backupDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
+      backupIntervalMs = config.interval.as(Time.MILLISECONDS);
+      lastBackupMs = clock.nowMillis();
+    }
+
+    @Override public Snapshot createSnapshot() {
+      Snapshot snapshot = delegate.createSnapshot();
+      if (clock.nowMillis() >= (lastBackupMs + backupIntervalMs)) {
+        save(snapshot);
+      }
+      return snapshot;
+    }
+
+    @Override public void backupNow() {
+      save(delegate.createSnapshot());
+    }
+
+    @VisibleForTesting
+    String createBackupName() {
+      return FILE_PREFIX + backupDateFormat.format(new Date(clock.nowMillis()));
+    }
+
+    private void save(Snapshot snapshot) {
+      lastBackupMs = clock.nowMillis();
+
+      String backupName = createBackupName();
+      String tempBackupName = "temp_" + backupName;
+      File tempFile = new File(config.dir, tempBackupName);
+      LOG.info("Saving backup to " + tempFile);
+      try {
+        byte[] backup = ThriftBinaryCodec.encodeNonNull(snapshot);
+        Files.write(backup, tempFile);
+        Files.move(tempFile, new File(config.dir, backupName));
+        successes.incrementAndGet();
+      } catch (IOException e) {
+        failures.incrementAndGet();
+        LOG.log(Level.SEVERE, "Failed to prepare backup " + backupName + ": " + e, e);
+      } catch (CodingException e) {
+        LOG.log(Level.SEVERE, "Failed to encode backup " + backupName + ": " + e, e);
+        failures.incrementAndGet();
+      } finally {
+        if (tempFile.exists()) {
+          LOG.info("Deleting incomplete backup file " + tempFile);
+          tempFile.delete();
+        }
+      }
+
+      File[] backups = config.dir.listFiles(BACKUP_FILTER);
+      if (backups == null) {
+        LOG.severe("Failed to list backup dir " + config.dir);
+      } else {
+        int backupsToDelete = backups.length - config.maxBackups;
+        if (backupsToDelete > 0) {
+          List<File> toDelete = Ordering.natural()
+              .onResultOf(FILE_NAME)
+              .sortedCopy(ImmutableList.copyOf(backups)).subList(0, backupsToDelete);
+          LOG.info("Deleting " + backupsToDelete + " outdated backups: " + toDelete);
+          for (File outdated : toDelete) {
+            outdated.delete();
+          }
+        }
+      }
+    }
+
+    private static final FilenameFilter BACKUP_FILTER = new FilenameFilter() {
+      @Override public boolean accept(File file, String s) {
+        return s.startsWith(FILE_PREFIX);
+      }
+    };
+
+    @VisibleForTesting
+    static final Function<File, String> FILE_NAME = new Function<File, String>() {
+      @Override public String apply(File file) {
+        return file.getName();
+      }
+    };
+
+    @Override
+    public void applySnapshot(Snapshot snapshot) {
+      delegate.applySnapshot(snapshot);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
new file mode 100644
index 0000000..e0906fe
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.backup;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.SnapshotStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import com.twitter.aurora.scheduler.storage.mem.MemStorage;
+import com.twitter.common.util.testing.FakeClock;
+
+/**
+ * A short-lived in-memory storage system that can be converted to a {@link Snapshot}.
+ */
+interface TemporaryStorage {
+
+  /**
+   * Deletes all tasks matching a query.  Deleted tasks will not be reflected in the snapshot when
+   * {@link #toSnapshot()} is executed.
+   *
+   * @param query Query builder for tasks to delete.
+   */
+  void deleteTasks(Query.Builder query);
+
+  /**
+   * Fetches tasks matching a query.
+   *
+   * @param query Query builder for tasks to fetch.
+   * @return Matching tasks.
+   */
+  Set<IScheduledTask> fetchTasks(Query.Builder query);
+
+  /**
+   * Creates a snapshot of the contents of the temporary storage.
+   *
+   * @return Temporary storage snapshot.
+   */
+  Snapshot toSnapshot();
+
+  /**
+   * A factory that creates temporary storage instances, detached from the rest of the system.
+   */
+  class TemporaryStorageFactory implements Function<Snapshot, TemporaryStorage> {
+    @Override public TemporaryStorage apply(Snapshot snapshot) {
+      final Storage storage = MemStorage.newEmptyStorage();
+      FakeClock clock = new FakeClock();
+      clock.setNowMillis(snapshot.getTimestamp());
+      final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl(clock, storage);
+      snapshotStore.applySnapshot(snapshot);
+
+      return new TemporaryStorage() {
+        @Override public void deleteTasks(final Query.Builder query) {
+          storage.write(new MutateWork.NoResult.Quiet() {
+            @Override protected void execute(MutableStoreProvider storeProvider) {
+              Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
+                  .transform(Tasks.SCHEDULED_TO_ID)
+                  .toSet();
+              storeProvider.getUnsafeTaskStore().deleteTasks(ids);
+            }
+          });
+        }
+
+        @Override public Set<IScheduledTask> fetchTasks(final Query.Builder query) {
+          return storage.consistentRead(new Work.Quiet<Set<IScheduledTask>>() {
+            @Override public Set<IScheduledTask> apply(StoreProvider storeProvider) {
+              return storeProvider.getTaskStore().fetchTasks(query);
+            }
+          });
+        }
+
+        @Override public Snapshot toSnapshot() {
+          return snapshotStore.createSnapshot();
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java b/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
new file mode 100644
index 0000000..74e8c07
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.log;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.storage.LogEntry;
+import com.twitter.aurora.gen.storage.LogEntry._Fields;
+import com.twitter.common.stats.Stats;
+
+/**
+ * Utility class for working with log entries.
+ */
+final class Entries {
+
+  private static final Logger LOG = Logger.getLogger(Entries.class.getName());
+
+  private static final AtomicLong COMPRESSION_BYTES_SAVED =
+      Stats.exportLong("log_compressed_entry_bytes_saved");
+
+  private Entries() {
+    // Utility class.
+  }
+
+  /**
+   * Deflates a log entry and wraps it in a deflated entry.
+   * <p>
+   * This will encode the entry using the thrift binary codec, and will apply deflate compression to
+   * the resulting encoded data.
+   * <p>
+   * This operation is symmetric with {@link #inflate(LogEntry)}.
+   *
+   * @param entry Entry to deflate.
+   * @return An entry with the {@code deflatedEntry} field set with the deflated serialized value
+   *         of the original entry.
+   * @throws CodingException If the value could not be encoded or deflated.
+   */
+  static LogEntry deflate(LogEntry entry) throws CodingException {
+    byte[] data = thriftBinaryEncode(entry);
+    int initialLength = data.length;
+    LOG.info("Deflating log entry of size " + initialLength);
+    ByteArrayOutputStream deflated = new ByteArrayOutputStream();
+    DeflaterOutputStream deflater = new DeflaterOutputStream(deflated);
+    try {
+      deflater.write(data);
+      deflater.flush();
+      deflater.close();
+      byte[] deflatedData = deflated.toByteArray();
+      int bytesSaved = initialLength - deflatedData.length;
+      if (bytesSaved < 0) {
+        LOG.warning("Deflated entry is larger than original by " + (bytesSaved * -1) + " bytes");
+      } else {
+        LOG.info("Deflated log entry size: " + deflatedData.length + " (saved " + bytesSaved + ")");
+      }
+
+      COMPRESSION_BYTES_SAVED.addAndGet(bytesSaved);
+      return LogEntry.deflatedEntry(ByteBuffer.wrap(deflatedData));
+    } catch (IOException e) {
+      throw new CodingException("Failed to deflate snapshot: " + e, e);
+    }
+  }
+
+  /**
+   * Inflates and deserializes a deflated log entry.
+   * <p>
+   * This requires that the {@code deflatedEntry} field is set on the provided {@code entry}.
+   * The encoded value will be inflated and deserialized as a {@link LogEntry}.
+   *
+   * @param entry Entry to inflate, which must be a deflated entry.
+   * @return The inflated entry.
+   * @throws CodingException If the value could not be inflated or decoded.
+   */
+  static LogEntry inflate(LogEntry entry) throws CodingException {
+    Preconditions.checkArgument(entry.isSet(_Fields.DEFLATED_ENTRY));
+
+    ByteArrayOutputStream inflated = new ByteArrayOutputStream();
+    ByteBuffer data = entry.bufferForDeflatedEntry();
+    LOG.info("Inflating deflated log entry of size " + data.remaining());
+    InflaterInputStream inflater = new InflaterInputStream(
+        new ByteArrayInputStream(data.array(), data.position(), data.remaining()));
+    try {
+      ByteStreams.copy(inflater, inflated);
+      byte[] inflatedData = inflated.toByteArray();
+      LOG.info("Inflated log entry size: " + inflatedData.length);
+      return thriftBinaryDecode(inflatedData);
+    } catch (IOException e) {
+      throw new CodingException("Failed to inflate compressed log entry.", e);
+    }
+  }
+
+  /**
+   * Thrift binary-encodes a log entry.
+   *
+   * @param entry The entry to encode.
+   * @return The serialized entry value.
+   * @throws CodingException If the entry could not be encoded.
+   */
+  static byte[] thriftBinaryEncode(LogEntry entry) throws CodingException {
+    return ThriftBinaryCodec.encodeNonNull(entry);
+  }
+
+  /**
+   * Decodes a byte array containing thrift binary-encoded data.
+   *
+   * @param contents The data to decode.
+   * @return The deserialized entry.
+   * @throws CodingException If the entry could not be deserialized.
+   */
+  static LogEntry thriftBinaryDecode(byte[] contents) throws CodingException {
+    return ThriftBinaryCodec.decodeNonNull(LogEntry.class, contents);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
new file mode 100644
index 0000000..da29401
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
@@ -0,0 +1,516 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.log;
+
+import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Bytes;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.gen.storage.Frame;
+import com.twitter.aurora.gen.storage.FrameChunk;
+import com.twitter.aurora.gen.storage.FrameHeader;
+import com.twitter.aurora.gen.storage.LogEntry;
+import com.twitter.aurora.gen.storage.LogEntry._Fields;
+import com.twitter.aurora.gen.storage.Op;
+import com.twitter.aurora.gen.storage.RemoveTasks;
+import com.twitter.aurora.gen.storage.SaveHostAttributes;
+import com.twitter.aurora.gen.storage.SaveTasks;
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.gen.storage.Transaction;
+import com.twitter.aurora.gen.storage.storageConstants;
+import com.twitter.aurora.scheduler.log.Log;
+import com.twitter.aurora.scheduler.log.Log.Entry;
+import com.twitter.aurora.scheduler.log.Log.Position;
+import com.twitter.aurora.scheduler.log.Log.Stream;
+import com.twitter.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import com.twitter.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.stats.Stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Manages opening, reading from and writing to a {@link Log}.
+ */
+public final class LogManager {
+
+  /**
+   * Identifies the maximum log entry size to permit before chunking entries into frames.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @BindingAnnotation
+  public @interface MaxEntrySize { }
+
+  /**
+   * Binding annotation for settings regarding the way snapshots are written.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @BindingAnnotation
+  public @interface SnapshotSetting { }
+
+  private static final Logger LOG = Logger.getLogger(LogManager.class.getName());
+
+  private final Log log;
+  private final Amount<Integer, Data> maxEntrySize;
+  private final boolean deflateSnapshots;
+  private final ShutdownRegistry shutdownRegistry;
+
+  @Inject
+  LogManager(
+      Log log,
+      @MaxEntrySize Amount<Integer, Data> maxEntrySize,
+      @SnapshotSetting boolean deflateSnapshots,
+      ShutdownRegistry shutdownRegistry) {
+
+    this.log = checkNotNull(log);
+    this.maxEntrySize = checkNotNull(maxEntrySize);
+    this.deflateSnapshots = deflateSnapshots;
+    this.shutdownRegistry = checkNotNull(shutdownRegistry);
+  }
+
+  /**
+   * Opens the log for reading and writing.
+   *
+   * @return A stream manager that can be used to manipulate the log stream.
+   * @throws IOException If there is a problem opening the log.
+   */
+  public StreamManager open() throws IOException {
+    final Stream stream = log.open();
+    shutdownRegistry.addAction(new ExceptionalCommand<IOException>() {
+      @Override public void execute() throws IOException {
+        stream.close();
+      }
+    });
+    return new StreamManager(stream, deflateSnapshots, maxEntrySize);
+  }
+
+  /**
+   * Manages interaction with the log stream.  Log entries can be
+   * {@link #readFromBeginning(com.twitter.common.base.Closure) read from} the beginning,
+   * a {@link #startTransaction() transaction} consisting of one or more local storage
+   * operations can be committed atomically, or the log can be compacted by
+   * {@link #snapshot(com.twitter.aurora.gen.storage.Snapshot) snapshotting}.
+   */
+  public static class StreamManager {
+
+    private static MessageDigest createDigest() {
+      try {
+        return MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new IllegalStateException("Could not find provider for standard algorithm 'MD5'", e);
+      }
+    }
+
+    private static class Vars {
+      private final AtomicInteger unSnapshottedTransactions =
+          Stats.exportInt("scheduler_log_un_snapshotted_transactions");
+      private final AtomicLong bytesWritten = Stats.exportLong("scheduler_log_bytes_written");
+      private final AtomicLong entriesWritten = Stats.exportLong("scheduler_log_entries_written");
+      private final AtomicLong badFramesRead = Stats.exportLong("scheduler_log_bad_frames_read");
+      private final AtomicLong bytesRead = Stats.exportLong("scheduler_log_bytes_read");
+      private final AtomicLong entriesRead = Stats.exportLong("scheduler_log_entries_read");
+      private final AtomicLong deflatedEntriesRead =
+          Stats.exportLong("scheduler_log_deflated_entries_read");
+      private final AtomicLong snapshots = Stats.exportLong("scheduler_log_snapshots");
+    }
+    private final Vars vars = new Vars();
+
+    private final Object writeMutex = new Object();
+    private final Stream stream;
+    private final boolean deflateSnapshots;
+    private final MessageDigest digest;
+    private final EntrySerializer entrySerializer;
+
+    StreamManager(Stream stream, boolean deflateSnapshots, Amount<Integer, Data> maxEntrySize) {
+      this.stream = checkNotNull(stream);
+      this.deflateSnapshots = deflateSnapshots;
+      digest = createDigest();
+      entrySerializer = new EntrySerializer(digest, maxEntrySize);
+    }
+
+    /**
+     * Reads all entries in the log stream after the given position.  If the position
+     * supplied is {@code null} then all log entries in the stream will be read.
+     *
+     * @param reader A reader that will be handed log entries decoded from the stream.
+     * @throws CodingException if there was a problem decoding a log entry from the stream.
+     * @throws InvalidPositionException if the given position is not found in the log.
+     * @throws StreamAccessException if there is a problem reading from the log.
+     */
+    public void readFromBeginning(Closure<LogEntry> reader)
+        throws CodingException, InvalidPositionException, StreamAccessException {
+
+      Iterator<Entry> entries = stream.readAll();
+
+      while (entries.hasNext()) {
+        LogEntry logEntry = decodeLogEntry(entries.next());
+        while (logEntry != null && isFrame(logEntry)) {
+          logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
+        }
+        if (logEntry != null) {
+          if (logEntry.isSet(_Fields.DEFLATED_ENTRY)) {
+            logEntry = Entries.inflate(logEntry);
+            vars.deflatedEntriesRead.incrementAndGet();
+          }
+
+          reader.execute(logEntry);
+          vars.entriesRead.incrementAndGet();
+        }
+      }
+    }
+
+    @Nullable
+    private LogEntry tryDecodeFrame(Frame frame, Iterator<Entry> entries) throws CodingException {
+      if (!isHeader(frame)) {
+        LOG.warning("Found a frame with no preceding header, skipping.");
+        return null;
+      }
+      FrameHeader header = frame.getHeader();
+      byte[][] chunks = new byte[header.chunkCount][];
+
+      digest.reset();
+      for (int i = 0; i < header.chunkCount; i++) {
+        if (!entries.hasNext()) {
+          logBadFrame(header, i);
+          return null;
+        }
+        LogEntry logEntry = decodeLogEntry(entries.next());
+        if (!isFrame(logEntry)) {
+          logBadFrame(header, i);
+          return logEntry;
+        }
+        Frame chunkFrame = logEntry.getFrame();
+        if (!isChunk(chunkFrame)) {
+          logBadFrame(header, i);
+          return logEntry;
+        }
+        byte[] chunkData = chunkFrame.getChunk().getData();
+        digest.update(chunkData);
+        chunks[i] = chunkData;
+      }
+      if (!Arrays.equals(header.getChecksum(), digest.digest())) {
+        throw new CodingException("Read back a framed log entry that failed its checksum");
+      }
+      return Entries.thriftBinaryDecode(Bytes.concat(chunks));
+    }
+
+    private static boolean isFrame(LogEntry logEntry) {
+      return logEntry.getSetField() == LogEntry._Fields.FRAME;
+    }
+
+    private static boolean isChunk(Frame frame) {
+      return frame.getSetField() == Frame._Fields.CHUNK;
+    }
+
+    private static boolean isHeader(Frame frame) {
+      return frame.getSetField() == Frame._Fields.HEADER;
+    }
+
+    private void logBadFrame(FrameHeader header, int chunkIndex) {
+      LOG.info(String.format("Found an aborted transaction, required %d frames and found %d",
+          header.chunkCount, chunkIndex));
+      vars.badFramesRead.incrementAndGet();
+    }
+
+    private LogEntry decodeLogEntry(Entry entry) throws CodingException {
+      byte[] contents = entry.contents();
+      vars.bytesRead.addAndGet(contents.length);
+      return Entries.thriftBinaryDecode(contents);
+    }
+
+    /**
+     * Truncates all entries in the log stream occuring before the given position.  The entry at the
+     * given position becomes the first entry in the stream when this call completes.
+     *
+     * @param position The last position to keep in the stream.
+     * @throws InvalidPositionException if the specified position does not exist in this log.
+     * @throws StreamAccessException if the stream could not be truncated.
+     */
+    void truncateBefore(Position position) {
+      stream.truncateBefore(position);
+    }
+
+    /**
+     * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream
+     * atomically.
+     *
+     * @return StreamTransaction A transaction manager to handle batching up commits to the
+     *    underlying stream.
+     */
+    StreamTransaction startTransaction() {
+      return new StreamTransaction();
+    }
+
+    /**
+     * Adds a snapshot to the log and if successful, truncates the log entries preceding the
+     * snapshot.
+     *
+     * @param snapshot The snapshot to add.
+     * @throws CodingException if the was a problem encoding the snapshot into a log entry.
+     * @throws InvalidPositionException if there was a problem truncating before the snapshot.
+     * @throws StreamAccessException if there was a problem appending the snapshot to the log.
+     */
+    @Timed("log_manager_snapshot")
+    void snapshot(Snapshot snapshot)
+        throws CodingException, InvalidPositionException, StreamAccessException {
+
+      LogEntry entry = LogEntry.snapshot(snapshot);
+      if (deflateSnapshots) {
+        entry = Entries.deflate(entry);
+      }
+
+      Position position = appendAndGetPosition(entry);
+      vars.snapshots.incrementAndGet();
+      vars.unSnapshottedTransactions.set(0);
+      stream.truncateBefore(position);
+    }
+
+    @Timed("log_manager_append")
+    private Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
+      Position firstPosition = null;
+      byte[][] entries = entrySerializer.serialize(logEntry);
+      synchronized (writeMutex) { // ensure all sub-entries are written as a unit
+        for (byte[] entry : entries) {
+          Position position = stream.append(entry);
+          if (firstPosition == null) {
+            firstPosition = position;
+          }
+          vars.bytesWritten.addAndGet(entry.length);
+        }
+      }
+      vars.entriesWritten.incrementAndGet();
+      return firstPosition;
+    }
+
+    @VisibleForTesting
+    public static class EntrySerializer {
+      private final MessageDigest digest;
+      private final int maxEntrySizeBytes;
+
+      private EntrySerializer(MessageDigest digest, Amount<Integer, Data> maxEntrySize) {
+        this.digest = checkNotNull(digest);
+        maxEntrySizeBytes = maxEntrySize.as(Data.BYTES);
+      }
+
+      public EntrySerializer(Amount<Integer, Data> maxEntrySize) {
+        this(createDigest(), maxEntrySize);
+      }
+
+      /**
+       * Serializes a log entry and splits it into chunks no larger than {@code maxEntrySizeBytes}.
+       *
+       * @param logEntry The log entry to serialize.
+       * @return Serialized and chunked log entry.
+       * @throws CodingException If the entry could not be serialized.
+       */
+      @VisibleForTesting
+      public byte[][] serialize(LogEntry logEntry) throws CodingException {
+        byte[] entry = Entries.thriftBinaryEncode(logEntry);
+        if (entry.length <= maxEntrySizeBytes) {
+          return new byte[][] {entry};
+        }
+
+        int chunks = (int) Math.ceil(entry.length / (double) maxEntrySizeBytes);
+        byte[][] frames = new byte[chunks + 1][];
+
+        frames[0] = encode(Frame.header(new FrameHeader(chunks, ByteBuffer.wrap(checksum(entry)))));
+        for (int i = 0; i < chunks; i++) {
+          int offset = i * maxEntrySizeBytes;
+          ByteBuffer chunk =
+              ByteBuffer.wrap(entry, offset, Math.min(maxEntrySizeBytes, entry.length - offset));
+          frames[i + 1] = encode(Frame.chunk(new FrameChunk(chunk)));
+        }
+        return frames;
+      }
+
+      private byte[] checksum(byte[] data) {
+        digest.reset();
+        return digest.digest(data);
+      }
+
+      private static byte[] encode(Frame frame) throws CodingException {
+        return Entries.thriftBinaryEncode(LogEntry.frame(frame));
+      }
+    }
+
+    /**
+     * Manages a single log stream append transaction.  Local storage ops can be added to the
+     * transaction and then later committed as an atomic unit.
+     */
+    final class StreamTransaction {
+      private final Transaction transaction =
+          new Transaction().setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION);
+      private final AtomicBoolean committed = new AtomicBoolean(false);
+
+      private StreamTransaction() {
+        // supplied by factory method
+      }
+
+      /**
+       * Appends any ops that have been added to this transaction to the log stream in a single
+       * atomic record.
+       *
+       * @return The position of the log entry committed in this transaction, if any.
+       * @throws CodingException If there was a problem encoding a log entry for commit.
+       */
+      Position commit() throws CodingException {
+        Preconditions.checkState(!committed.getAndSet(true),
+            "Can only call commit once per transaction.");
+
+        if (!transaction.isSetOps()) {
+          return null;
+        }
+
+        Position position = appendAndGetPosition(LogEntry.transaction(transaction));
+        vars.unSnapshottedTransactions.incrementAndGet();
+        return position;
+      }
+
+      /**
+       * Adds a local storage operation to this transaction.
+       *
+       * @param op The local storage op to add.
+       */
+      void add(Op op) {
+        Preconditions.checkState(!committed.get());
+
+        Op prior = transaction.isSetOps() ? Iterables.getLast(transaction.getOps(), null) : null;
+        if (prior == null || !coalesce(prior, op)) {
+          transaction.addToOps(op);
+        }
+      }
+
+      /**
+       * Tries to coalesce a new op into the prior to compact the binary representation and increase
+       * batching.
+       *
+       * <p>Its recommended that as new {@code Op}s are added, they be treated here although they
+       * need not be</p>
+       *
+       * @param prior The previous op.
+       * @param next The next op to be added.
+       * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
+       */
+      private boolean coalesce(Op prior, Op next) {
+        if (!prior.isSet() && !next.isSet()) {
+          return false;
+        }
+
+        Op._Fields priorType = prior.getSetField();
+        if (!priorType.equals(next.getSetField())) {
+          return false;
+        }
+
+        switch (priorType) {
+          case SAVE_FRAMEWORK_ID:
+            prior.setSaveFrameworkId(next.getSaveFrameworkId());
+            return true;
+
+          case SAVE_ACCEPTED_JOB:
+          case REMOVE_JOB:
+          case SAVE_QUOTA:
+          case REMOVE_QUOTA:
+            return false;
+
+          case SAVE_TASKS:
+            coalesce(prior.getSaveTasks(), next.getSaveTasks());
+            return true;
+          case REMOVE_TASKS:
+            coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
+            return true;
+          case SAVE_HOST_ATTRIBUTES:
+            return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
+          default:
+            LOG.warning("Unoptimized op: " + priorType);
+            return false;
+        }
+      }
+
+      private void coalesce(SaveTasks prior, SaveTasks next) {
+        if (next.isSetTasks()) {
+          if (prior.isSetTasks()) {
+            // It is an expected invariant that an operation may reference a task (identified by
+            // task ID) no more than one time.  Therefore, to coalesce two SaveTasks operations,
+            // the most recent task definition overrides the prior operation.
+            Map<String, ScheduledTask> coalesced = Maps.newHashMap();
+            for (ScheduledTask task : prior.getTasks()) {
+              coalesced.put(task.getAssignedTask().getTaskId(), task);
+            }
+            for (ScheduledTask task : next.getTasks()) {
+              coalesced.put(task.getAssignedTask().getTaskId(), task);
+            }
+            prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
+          } else {
+            prior.setTasks(next.getTasks());
+          }
+        }
+      }
+
+      private void coalesce(RemoveTasks prior, RemoveTasks next) {
+        if (next.isSetTaskIds()) {
+          if (prior.isSetTaskIds()) {
+            prior.setTaskIds(ImmutableSet.<String>builder()
+                .addAll(prior.getTaskIds())
+                .addAll(next.getTaskIds())
+                .build());
+          } else {
+            prior.setTaskIds(next.getTaskIds());
+          }
+        }
+      }
+
+      private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
+        if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
+          prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
+          return true;
+        }
+        return false;
+      }
+    }
+  }
+}