You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/03/15 00:25:08 UTC

git commit: Refactor LogStorage to more cleanly separate replay and write-behind modes, and require explicit implementation of mutate operations.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 5d4e041fd -> 1713d0824


Refactor LogStorage to more cleanly separate replay and write-behind modes, and
require explicit implementation of mutate operations.

There are a few nice things falling out of this refactor:
- New methods on mutable store interfaces are not implicitly forwarded
  (ForwardingStore does not implement mutable stores).
- Write ahead/behind behavior is more obvious in LogStorage
  (i found the delegation by calling super tough to catch mistakes.)
- Callers with a handle on LogStorage don't have a means to invoke mutate calls
  outside of a transaction (they only get access to mutable stores in write(),
  which obviated testMutateRequiresWriteOperation)

Reviewed at https://reviews.apache.org/r/18487/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1713d082
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1713d082
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1713d082

Branch: refs/heads/master
Commit: 1713d082405e0f5adbc6872fbf45e5a5dda3996d
Parents: 5d4e041
Author: Bill Farner <wf...@apache.org>
Authored: Fri Mar 14 16:24:45 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Mar 14 16:24:45 2014 -0700

----------------------------------------------------------------------
 .../scheduler/storage/ForwardingStore.java      | 157 +--------
 .../aurora/scheduler/storage/TaskStore.java     |   2 +
 .../scheduler/storage/log/LogStorage.java       | 333 +++++-------------
 .../storage/log/WriteAheadStorage.java          | 343 +++++++++++++++++++
 .../scheduler/storage/log/LogStorageTest.java   |  14 +-
 5 files changed, 457 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
index 8758eb7..a67f9d8 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
@@ -20,12 +20,10 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -33,7 +31,6 @@ import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -42,26 +39,23 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * an existing storage system.
  */
 public class ForwardingStore implements
-    Storage,
-    SchedulerStore.Mutable,
-    JobStore.Mutable,
-    TaskStore.Mutable,
-    LockStore.Mutable,
-    QuotaStore.Mutable,
-    AttributeStore.Mutable {
-
-  private final Storage storage;
-  private final SchedulerStore.Mutable schedulerStore;
-  private final JobStore.Mutable jobStore;
-  private final TaskStore.Mutable taskStore;
-  private final LockStore.Mutable lockStore;
-  private final QuotaStore.Mutable quotaStore;
-  private final AttributeStore.Mutable attributeStore;
+    SchedulerStore,
+    JobStore,
+    TaskStore,
+    LockStore,
+    QuotaStore,
+    AttributeStore {
+
+  private final SchedulerStore schedulerStore;
+  private final JobStore jobStore;
+  private final TaskStore taskStore;
+  private final LockStore lockStore;
+  private final QuotaStore quotaStore;
+  private final AttributeStore attributeStore;
 
   /**
    * Creats a new forwarding store that delegates to the providing default stores.
    *
-   * @param storage Delegate.
    * @param schedulerStore Delegate.
    * @param jobStore Delegate.
    * @param taskStore Delegate.
@@ -70,15 +64,13 @@ public class ForwardingStore implements
    * @param attributeStore Delegate.
    */
   public ForwardingStore(
-      Storage storage,
-      SchedulerStore.Mutable schedulerStore,
-      JobStore.Mutable jobStore,
-      TaskStore.Mutable taskStore,
-      LockStore.Mutable lockStore,
-      QuotaStore.Mutable quotaStore,
-      AttributeStore.Mutable attributeStore) {
+      SchedulerStore schedulerStore,
+      JobStore jobStore,
+      TaskStore taskStore,
+      LockStore lockStore,
+      QuotaStore quotaStore,
+      AttributeStore attributeStore) {
 
-    this.storage = checkNotNull(storage);
     this.schedulerStore = checkNotNull(schedulerStore);
     this.jobStore = checkNotNull(jobStore);
     this.taskStore = checkNotNull(taskStore);
@@ -88,29 +80,6 @@ public class ForwardingStore implements
   }
 
   @Override
-  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    return storage.consistentRead(work);
-  }
-
-  @Override
-  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
-      throws StorageException, E {
-    return storage.weaklyConsistentRead(work);
-  }
-
-  @Override
-  public <T, E extends Exception> T write(MutateWork<T, E> work)
-      throws StorageException, E {
-
-    return storage.write(work);
-  }
-
-  @Override
-  public void saveFrameworkId(String frameworkId) {
-    schedulerStore.saveFrameworkId(frameworkId);
-  }
-
-  @Override
   @Nullable
   public String fetchFrameworkId() {
     return schedulerStore.fetchFrameworkId();
@@ -127,54 +96,11 @@ public class ForwardingStore implements
   }
 
   @Override
-  public void saveAcceptedJob(String managerId, IJobConfiguration jobConfig) {
-    jobStore.saveAcceptedJob(managerId, jobConfig);
-  }
-
-  @Override
-  public void removeJob(IJobKey jobKey) {
-    jobStore.removeJob(jobKey);
-  }
-
-  @Override
-  public void deleteJobs() {
-    jobStore.deleteJobs();
-  }
-
-  @Override
   public Set<String> fetchManagerIds() {
     return jobStore.fetchManagerIds();
   }
 
   @Override
-  public void saveTasks(Set<IScheduledTask> tasks) throws IllegalStateException {
-    taskStore.saveTasks(tasks);
-  }
-
-  @Override
-  public void deleteAllTasks() {
-    taskStore.deleteAllTasks();
-  }
-
-  @Override
-  public void deleteTasks(Set<String> taskIds) {
-    taskStore.deleteTasks(taskIds);
-  }
-
-  @Override
-  public ImmutableSet<IScheduledTask> mutateTasks(
-      Query.Builder query,
-      Function<IScheduledTask, IScheduledTask> mutator) {
-
-    return taskStore.mutateTasks(query, mutator);
-  }
-
-  @Override
-  public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
-    return taskStore.unsafeModifyInPlace(taskId, taskConfiguration);
-  }
-
-  @Override
   public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder querySupplier) {
     return taskStore.fetchTasks(querySupplier);
   }
@@ -190,51 +116,16 @@ public class ForwardingStore implements
   }
 
   @Override
-  public void saveLock(ILock lock) {
-    lockStore.saveLock(lock);
-  }
-
-  @Override
-  public void removeLock(ILockKey lockKey) {
-    lockStore.removeLock(lockKey);
-  }
-
-  @Override
-  public void deleteLocks() {
-    lockStore.deleteLocks();
-  }
-
-  @Override
   public Map<String, IResourceAggregate> fetchQuotas() {
     return quotaStore.fetchQuotas();
   }
 
   @Override
-  public void removeQuota(String role) {
-    quotaStore.removeQuota(role);
-  }
-
-  @Override
-  public void deleteQuotas() {
-    quotaStore.deleteQuotas();
-  }
-
-  @Override
-  public void saveQuota(String role, IResourceAggregate quota) {
-    quotaStore.saveQuota(role, quota);
-  }
-
-  @Override
   public Optional<IResourceAggregate> fetchQuota(String role) {
     return quotaStore.fetchQuota(role);
   }
 
   @Override
-  public void saveHostAttributes(HostAttributes hostAttribute) {
-    attributeStore.saveHostAttributes(hostAttribute);
-  }
-
-  @Override
   public Optional<HostAttributes> getHostAttributes(String host) {
     return attributeStore.getHostAttributes(host);
   }
@@ -243,14 +134,4 @@ public class ForwardingStore implements
   public Set<HostAttributes> getHostAttributes() {
     return attributeStore.getHostAttributes();
   }
-
-  @Override
-  public void deleteHostAttributes() {
-    attributeStore.deleteHostAttributes();
-  }
-
-  @Override
-  public boolean setMaintenanceMode(String host, MaintenanceMode mode) {
-    return attributeStore.setMaintenanceMode(host, mode);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 3d0ff2d..4dcc5ae 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -58,6 +58,8 @@ public interface TaskStore {
 
     /**
      * Removes all tasks from the store.
+     * TODO(wfarner): Move this and other mass-delete methods to an interface that is only
+     * accessible by SnapshotStoreImpl.
      */
     void deleteAllTasks();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 3ccf800..a1a0b42 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -21,8 +21,6 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import java.util.Date;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -32,12 +30,6 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
 import com.google.inject.BindingAnnotation;
 import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.base.Closure;
@@ -47,30 +39,17 @@ import com.twitter.common.quantity.Time;
 import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveLock;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
 import org.apache.aurora.gen.storage.RewriteTask;
 import org.apache.aurora.gen.storage.SaveAcceptedJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveLock;
 import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
 import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
 import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
-import org.apache.aurora.scheduler.storage.ForwardingStore;
 import org.apache.aurora.scheduler.storage.JobStore;
 import org.apache.aurora.scheduler.storage.LockStore;
 import org.apache.aurora.scheduler.storage.QuotaStore;
@@ -127,8 +106,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * <p>If the op fails to apply to local storage we will never write the op to the log and if the op
  * fails to apply to the log, it'll throw and abort the local storage transaction as well.
  */
-public class LogStorage extends ForwardingStore
-    implements NonVolatileStorage, DistributedSnapshotStore {
+public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore {
 
   /**
    * A service that can schedule an action to be executed periodically.
@@ -145,6 +123,27 @@ public class LogStorage extends ForwardingStore
     void doEvery(Amount<Long, Time> interval, Runnable action);
   }
 
+  /**
+   * A maintainer for context about open transactions. Assumes that an external entity is
+   * responsible for opening and closing transactions.
+   */
+  interface TransactionManager {
+
+    /**
+     * Checks whether there is an open transaction.
+     *
+     * @return {@code true} if there is an open transaction, {@code false} otherwise.
+     */
+    boolean hasActiveTransaction();
+
+    /**
+     * Adds an operation to the existing transaction.
+     *
+     * @param op Operation to include in the existing transaction.
+     */
+    void log(Op op);
+  }
+
   private static class ScheduledExecutorSchedulingService implements SchedulingService {
     private final ScheduledExecutorService scheduledExecutor;
 
@@ -172,49 +171,24 @@ public class LogStorage extends ForwardingStore
   private final SchedulingService schedulingService;
   private final SnapshotStore<Snapshot> snapshotStore;
   private final Amount<Long, Time> snapshotInterval;
+  private final Storage writeBehindStorage;
+  private final SchedulerStore.Mutable writeBehindSchedulerStore;
+  private final JobStore.Mutable writeBehindJobStore;
+  private final TaskStore.Mutable writeBehindTaskStore;
+  private final LockStore.Mutable writeBehindLockStore;
+  private final QuotaStore.Mutable writeBehindQuotaStore;
+  private final AttributeStore.Mutable writeBehindAttributeStore;
 
   private StreamManager streamManager;
+  private final WriteAheadStorage writeAheadStorage;
 
+  // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
+  // recovering are controlled at this layer (they're all calls to Mutable store implementations).
+  // The more involved change is changing SnapshotStore to accept a Mutable store provider to
+  // avoid a call to Storage.write() when we replay a Snapshot.
   private boolean recovered = false;
   private StreamTransaction transaction = null;
 
-  private final MutableStoreProvider logStoreProvider = new MutableStoreProvider() {
-    @Override
-    public SchedulerStore.Mutable getSchedulerStore() {
-      return LogStorage.this;
-    }
-
-    @Override
-    public JobStore.Mutable getJobStore() {
-      return LogStorage.this;
-    }
-
-    @Override
-    public TaskStore getTaskStore() {
-      return LogStorage.this;
-    }
-
-    @Override
-    public TaskStore.Mutable getUnsafeTaskStore() {
-      return LogStorage.this;
-    }
-
-    @Override
-    public LockStore.Mutable getLockStore() {
-      return LogStorage.this;
-    }
-
-    @Override
-    public QuotaStore.Mutable getQuotaStore() {
-      return LogStorage.this;
-    }
-
-    @Override
-    public AttributeStore.Mutable getAttributeStore() {
-      return LogStorage.this;
-    }
-  };
-
   /**
    * Identifies the grace period to give in-process snapshots and checkpoints to complete during
    * shutdown.
@@ -273,7 +247,7 @@ public class LogStorage extends ForwardingStore
              SchedulingService schedulingService,
              SnapshotStore<Snapshot> snapshotStore,
              Amount<Long, Time> snapshotInterval,
-             Storage storage,
+             Storage delegateStorage,
              SchedulerStore.Mutable schedulerStore,
              JobStore.Mutable jobStore,
              TaskStore.Mutable taskStore,
@@ -281,11 +255,41 @@ public class LogStorage extends ForwardingStore
              QuotaStore.Mutable quotaStore,
              AttributeStore.Mutable attributeStore) {
 
-    super(storage, schedulerStore, jobStore, taskStore, lockStore, quotaStore, attributeStore);
     this.logManager = checkNotNull(logManager);
     this.schedulingService = checkNotNull(schedulingService);
     this.snapshotStore = checkNotNull(snapshotStore);
     this.snapshotInterval = checkNotNull(snapshotInterval);
+
+    // Log storage has two distinct operating modes: pre- and post-recovery.  When recovering,
+    // we write directly to the writeBehind stores since we are replaying what's already persisted.
+    // After that, all writes must succeed in the distributed log before they may be considered
+    // successful.
+    this.writeBehindStorage = checkNotNull(delegateStorage);
+    this.writeBehindSchedulerStore = checkNotNull(schedulerStore);
+    this.writeBehindJobStore = checkNotNull(jobStore);
+    this.writeBehindTaskStore = checkNotNull(taskStore);
+    this.writeBehindLockStore = checkNotNull(lockStore);
+    this.writeBehindQuotaStore = checkNotNull(quotaStore);
+    this.writeBehindAttributeStore = checkNotNull(attributeStore);
+    TransactionManager transactionManager = new TransactionManager() {
+      @Override
+      public boolean hasActiveTransaction() {
+        return transaction != null;
+      }
+
+      @Override
+      public void log(Op op) {
+        transaction.add(op);
+      }
+    };
+    this.writeAheadStorage = new WriteAheadStorage(
+        transactionManager,
+        schedulerStore,
+        jobStore,
+        taskStore,
+        lockStore,
+        quotaStore,
+        attributeStore);
   }
 
   @Override
@@ -383,52 +387,57 @@ public class LogStorage extends ForwardingStore
   private void replayOp(Op op) {
     switch (op.getSetField()) {
       case SAVE_FRAMEWORK_ID:
-        saveFrameworkId(op.getSaveFrameworkId().getId());
+        writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId());
         break;
 
       case SAVE_ACCEPTED_JOB:
         SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
-        saveAcceptedJob(
+        writeBehindJobStore.saveAcceptedJob(
             acceptedJob.getManagerId(),
             IJobConfiguration.build(acceptedJob.getJobConfig()));
         break;
 
       case REMOVE_JOB:
-        removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
+        writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
         break;
 
       case SAVE_TASKS:
-        saveTasks(IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
+        writeBehindTaskStore.saveTasks(
+            IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
         break;
 
       case REWRITE_TASK:
         RewriteTask rewriteTask = op.getRewriteTask();
-        unsafeModifyInPlace(rewriteTask.getTaskId(), ITaskConfig.build(rewriteTask.getTask()));
+        writeBehindTaskStore.unsafeModifyInPlace(
+            rewriteTask.getTaskId(),
+            ITaskConfig.build(rewriteTask.getTask()));
         break;
 
       case REMOVE_TASKS:
-        deleteTasks(op.getRemoveTasks().getTaskIds());
+        writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds());
         break;
 
       case SAVE_QUOTA:
         SaveQuota saveQuota = op.getSaveQuota();
-        saveQuota(saveQuota.getRole(), IResourceAggregate.build(saveQuota.getQuota()));
+        writeBehindQuotaStore.saveQuota(
+            saveQuota.getRole(),
+            IResourceAggregate.build(saveQuota.getQuota()));
         break;
 
       case REMOVE_QUOTA:
-        removeQuota(op.getRemoveQuota().getRole());
+        writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole());
         break;
 
       case SAVE_HOST_ATTRIBUTES:
-        saveHostAttributes(op.getSaveHostAttributes().hostAttributes);
+        writeBehindAttributeStore.saveHostAttributes(op.getSaveHostAttributes().hostAttributes);
         break;
 
       case SAVE_LOCK:
-        saveLock(ILock.build(op.getSaveLock().getLock()));
+        writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()));
         break;
 
       case REMOVE_LOCK:
-        removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
+        writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
         break;
 
       default:
@@ -464,7 +473,7 @@ public class LogStorage extends ForwardingStore
    */
   @Timed("scheduler_log_snapshot")
   void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
-    super.write(new MutateWork.NoResult<CodingException>() {
+    write(new MutateWork.NoResult<CodingException>() {
       @Override
       protected void execute(MutableStoreProvider unused)
           throws CodingException, InvalidPositionException, StreamAccessException {
@@ -489,21 +498,21 @@ public class LogStorage extends ForwardingStore
     // We don't want to use the log when recovering from it, we just want to update the underlying
     // store - so pass mutations straight through to the underlying storage.
     if (!recovered) {
-      return super.write(work);
+      return writeBehindStorage.write(work);
     }
 
     // The log stream transaction has already been set up so we just need to delegate with our
     // store provider so any mutations performed by work get logged.
     if (transaction != null) {
-      return work.apply(logStoreProvider);
+      return work.apply(writeAheadStorage);
     }
 
     transaction = streamManager.startTransaction();
     try {
-      return super.write(new MutateWork<T, E>() {
+      return writeBehindStorage.write(new MutateWork<T, E>() {
         @Override
         public T apply(MutableStoreProvider unused) throws E {
-          T result = work.apply(logStoreProvider);
+          T result = work.apply(writeAheadStorage);
           try {
             transaction.commit();
           } catch (CodingException e) {
@@ -521,164 +530,16 @@ public class LogStorage extends ForwardingStore
     }
   }
 
-  @Timed("scheduler_log_save_framework_id")
   @Override
-  public void saveFrameworkId(final String frameworkId) {
-    checkNotNull(frameworkId);
-
-    log(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
-    super.saveFrameworkId(frameworkId);
+  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
+    return writeBehindStorage.consistentRead(work);
   }
 
-  @Timed("scheduler_log_job_save")
   @Override
-  public void saveAcceptedJob(final String managerId, final IJobConfiguration jobConfig) {
-    checkNotNull(managerId);
-    checkNotNull(jobConfig);
-
-    log(Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())));
-    super.saveAcceptedJob(managerId, jobConfig);
-  }
-
-  @Timed("scheduler_log_job_remove")
-  @Override
-  public void removeJob(final IJobKey jobKey) {
-    checkNotNull(jobKey);
-
-    log(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
-    super.removeJob(jobKey);
-  }
-
-  @Timed("scheduler_log_tasks_save")
-  @Override
-  public void saveTasks(final Set<IScheduledTask> newTasks) throws IllegalStateException {
-    checkNotNull(newTasks);
-
-    log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
-    super.saveTasks(newTasks);
-  }
-
-  @Override
-  public void deleteAllTasks() {
-    Query.Builder query = Query.unscoped();
-    Set<String> ids = FluentIterable.from(fetchTasks(query))
-        .transform(Tasks.SCHEDULED_TO_ID)
-        .toSet();
-    deleteTasks(ids);
-  }
-
-  @Timed("scheduler_log_tasks_remove")
-  @Override
-  public void deleteTasks(final Set<String> taskIds) {
-    checkNotNull(taskIds);
-
-    log(Op.removeTasks(new RemoveTasks(taskIds)));
-    super.deleteTasks(taskIds);
-  }
-
-  @Timed("scheduler_log_tasks_mutate")
-  @Override
-  public ImmutableSet<IScheduledTask> mutateTasks(
-      final Query.Builder query,
-      final Function<IScheduledTask, IScheduledTask> mutator) {
-
-    checkNotNull(query);
-    checkNotNull(mutator);
-
-    ImmutableSet<IScheduledTask> mutated = super.mutateTasks(query, mutator);
-
-    Map<String, IScheduledTask> tasksById = Tasks.mapById(mutated);
-    if (LOG.isLoggable(Level.FINE)) {
-      LOG.fine("Storing updated tasks to log: "
-          + Maps.transformValues(tasksById, Tasks.GET_STATUS));
-    }
-
-    // TODO(William Farner): Avoid writing an op when mutated is empty.
-    log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))));
-    return mutated;
-  }
-
-  @Timed("scheduler_log_unsafe_modify_in_place")
-  @Override
-  public boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) {
-    checkNotNull(taskId);
-    checkNotNull(taskConfiguration);
-
-    boolean mutated = super.unsafeModifyInPlace(taskId, taskConfiguration);
-    if (mutated) {
-      log(Op.rewriteTask(new RewriteTask(taskId, taskConfiguration.newBuilder())));
-    }
-    return mutated;
-  }
-
-  @Timed("scheduler_log_quota_remove")
-  @Override
-  public void removeQuota(final String role) {
-    checkNotNull(role);
-
-    log(Op.removeQuota(new RemoveQuota(role)));
-    super.removeQuota(role);
-  }
-
-  @Timed("scheduler_log_quota_save")
-  @Override
-  public void saveQuota(final String role, final IResourceAggregate quota) {
-    checkNotNull(role);
-    checkNotNull(quota);
-
-    log(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
-    super.saveQuota(role, quota);
-  }
-
-  @Timed("scheduler_save_host_attribute")
-  @Override
-  public void saveHostAttributes(final HostAttributes attrs) {
-    checkNotNull(attrs);
-
-    // Pass the updated attributes upstream, and then check if the stored value changes.
-    // We do this since different parts of the system write partial HostAttributes objects
-    // and they are merged together internally.
-    // TODO(William Farner): Split out a separate method
-    //                       saveAttributes(String host, Iterable<Attributes>) to simplify this.
-    Optional<HostAttributes> saved = getHostAttributes(attrs.getHost());
-    super.saveHostAttributes(attrs);
-    Optional<HostAttributes> updated = getHostAttributes(attrs.getHost());
-    if (!saved.equals(updated)) {
-      log(Op.saveHostAttributes(new SaveHostAttributes(updated.get())));
-    }
-  }
-
-  @Timed("scheduler_lock_save")
-  @Override
-  public void saveLock(final ILock lock) {
-    checkNotNull(lock);
-
-    log(Op.saveLock(new SaveLock(lock.newBuilder())));
-    super.saveLock(lock);
-  }
-
-  @Timed("scheduler_lock_remove")
-  @Override
-  public void removeLock(final ILockKey lockKey) {
-    checkNotNull(lockKey);
-
-    log(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
-    super.removeLock(lockKey);
-  }
+  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
+      throws StorageException, E {
 
-  @Override
-  public boolean setMaintenanceMode(final String host, final MaintenanceMode mode) {
-    checkNotNull(host);
-    checkNotNull(mode);
-
-    Optional<HostAttributes> saved = getHostAttributes(host);
-    if (saved.isPresent()) {
-      HostAttributes attributes = saved.get().setMode(mode);
-      log(Op.saveHostAttributes(new SaveHostAttributes(attributes)));
-      super.saveHostAttributes(attributes);
-      return true;
-    }
-    return false;
+    return writeBehindStorage.weaklyConsistentRead(work);
   }
 
   @Override
@@ -693,14 +554,4 @@ public class LogStorage extends ForwardingStore
       throw new StorageException("Failed to create a snapshot", e);
     }
   }
-
-  private void log(Op op) {
-    Preconditions.checkState(
-        !recovered || (transaction != null),
-        "Mutating operations must be done during recovery or within a transaction.");
-
-    if (recovered) {
-      transaction.add(op);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
new file mode 100644
index 0000000..e777e59
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -0,0 +1,343 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveLock;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.RewriteTask;
+import org.apache.aurora.gen.storage.SaveAcceptedJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveLock;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.ForwardingStore;
+import org.apache.aurora.scheduler.storage.JobStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.ILockKey;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.scheduler.storage.log.LogStorage.TransactionManager;
+
+/**
+ * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
+ * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
+ * stores.
+ */
+class WriteAheadStorage extends ForwardingStore implements
+    MutableStoreProvider,
+    SchedulerStore.Mutable,
+    JobStore.Mutable,
+    TaskStore.Mutable,
+    LockStore.Mutable,
+    QuotaStore.Mutable,
+    AttributeStore.Mutable {
+
+  private static final Logger LOG = Logger.getLogger(WriteAheadStorage.class.getName());
+
+  private final TransactionManager transactionManager;
+  private final SchedulerStore.Mutable schedulerStore;
+  private final JobStore.Mutable jobStore;
+  private final TaskStore.Mutable taskStore;
+  private final LockStore.Mutable lockStore;
+  private final QuotaStore.Mutable quotaStore;
+  private final AttributeStore.Mutable attributeStore;
+
+  /**
+   * Creates a new write-ahead storage that delegates to the providing default stores.
+   *
+   * @param transactionManager External controller for transaction operations.
+   * @param schedulerStore Delegate.
+   * @param jobStore       Delegate.
+   * @param taskStore      Delegate.
+   * @param lockStore      Delegate.
+   * @param quotaStore     Delegate.
+   * @param attributeStore Delegate.
+   */
+  WriteAheadStorage(
+      TransactionManager transactionManager,
+      SchedulerStore.Mutable schedulerStore,
+      JobStore.Mutable jobStore,
+      TaskStore.Mutable taskStore,
+      LockStore.Mutable lockStore,
+      QuotaStore.Mutable quotaStore,
+      AttributeStore.Mutable attributeStore) {
+
+    super(schedulerStore, jobStore, taskStore, lockStore, quotaStore, attributeStore);
+
+    this.transactionManager = checkNotNull(transactionManager);
+    this.schedulerStore = checkNotNull(schedulerStore);
+    this.jobStore = checkNotNull(jobStore);
+    this.taskStore = checkNotNull(taskStore);
+    this.lockStore = checkNotNull(lockStore);
+    this.quotaStore = checkNotNull(quotaStore);
+    this.attributeStore = checkNotNull(attributeStore);
+  }
+
+  private void log(Op op) {
+    Preconditions.checkState(
+        transactionManager.hasActiveTransaction(),
+        "Mutating operations must be within a transaction.");
+    transactionManager.log(op);
+  }
+
+  @Timed("scheduler_log_save_framework_id")
+  @Override
+  public void saveFrameworkId(final String frameworkId) {
+    checkNotNull(frameworkId);
+
+    log(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+    schedulerStore.saveFrameworkId(frameworkId);
+  }
+
+  @Timed("scheduler_log_unsafe_modify_in_place")
+  @Override
+  public boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) {
+    checkNotNull(taskId);
+    checkNotNull(taskConfiguration);
+
+    boolean mutated = taskStore.unsafeModifyInPlace(taskId, taskConfiguration);
+    if (mutated) {
+      log(Op.rewriteTask(new RewriteTask(taskId, taskConfiguration.newBuilder())));
+    }
+    return mutated;
+  }
+
+  @Timed("scheduler_log_tasks_remove")
+  @Override
+  public void deleteTasks(final Set<String> taskIds) {
+    checkNotNull(taskIds);
+
+    log(Op.removeTasks(new RemoveTasks(taskIds)));
+    taskStore.deleteTasks(taskIds);
+  }
+
+  @Timed("scheduler_log_tasks_save")
+  @Override
+  public void saveTasks(final Set<IScheduledTask> newTasks) {
+    checkNotNull(newTasks);
+
+    log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+    taskStore.saveTasks(newTasks);
+  }
+
+  @Timed("scheduler_log_tasks_mutate")
+  @Override
+  public ImmutableSet<IScheduledTask> mutateTasks(
+      final Query.Builder query,
+      final Function<IScheduledTask, IScheduledTask> mutator) {
+
+    checkNotNull(query);
+    checkNotNull(mutator);
+
+    ImmutableSet<IScheduledTask> mutated = taskStore.mutateTasks(query, mutator);
+
+    Map<String, IScheduledTask> tasksById = Tasks.mapById(mutated);
+    if (LOG.isLoggable(Level.FINE)) {
+      LOG.fine("Storing updated tasks to log: "
+          + Maps.transformValues(tasksById, Tasks.GET_STATUS));
+    }
+
+    // TODO(William Farner): Avoid writing an op when mutated is empty.
+    log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))));
+    return mutated;
+  }
+
+  @Timed("scheduler_log_quota_save")
+  @Override
+  public void saveQuota(final String role, final IResourceAggregate quota) {
+    checkNotNull(role);
+    checkNotNull(quota);
+
+    log(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+    quotaStore.saveQuota(role, quota);
+  }
+
+  @Timed("scheduler_save_host_attribute")
+  @Override
+  public void saveHostAttributes(final HostAttributes attrs) {
+    checkNotNull(attrs);
+
+    // Pass the updated attributes upstream, and then check if the stored value changes.
+    // We do this since different parts of the system write partial HostAttributes objects
+    // and they are merged together internally.
+    // TODO(William Farner): Split out a separate method
+    //                       saveAttributes(String host, Iterable<Attributes>) to simplify this.
+    Optional<HostAttributes> saved = getHostAttributes(attrs.getHost());
+    attributeStore.saveHostAttributes(attrs);
+    Optional<HostAttributes> updated = getHostAttributes(attrs.getHost());
+    if (!saved.equals(updated)) {
+      log(Op.saveHostAttributes(new SaveHostAttributes(updated.get())));
+    }
+  }
+
+  @Timed("scheduler_log_job_remove")
+  @Override
+  public void removeJob(final IJobKey jobKey) {
+    checkNotNull(jobKey);
+
+    log(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+    jobStore.removeJob(jobKey);
+  }
+
+  @Timed("scheduler_log_job_save")
+  @Override
+  public void saveAcceptedJob(final String managerId, final IJobConfiguration jobConfig) {
+    checkNotNull(managerId);
+    checkNotNull(jobConfig);
+
+    log(Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())));
+    jobStore.saveAcceptedJob(managerId, jobConfig);
+  }
+
+  @Timed("scheduler_log_quota_remove")
+  @Override
+  public void removeQuota(final String role) {
+    checkNotNull(role);
+
+    log(Op.removeQuota(new RemoveQuota(role)));
+    quotaStore.removeQuota(role);
+  }
+
+  @Timed("scheduler_lock_save")
+  @Override
+  public void saveLock(final ILock lock) {
+    checkNotNull(lock);
+
+    log(Op.saveLock(new SaveLock(lock.newBuilder())));
+    lockStore.saveLock(lock);
+  }
+
+  @Timed("scheduler_lock_remove")
+  @Override
+  public void removeLock(final ILockKey lockKey) {
+    checkNotNull(lockKey);
+
+    log(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
+    lockStore.removeLock(lockKey);
+  }
+
+  @Override
+  public void deleteAllTasks() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteHostAttributes() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteJobs() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteQuotas() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public void deleteLocks() {
+    throw new UnsupportedOperationException(
+        "Unsupported since casual storage users should never be doing this.");
+  }
+
+  @Override
+  public boolean setMaintenanceMode(final String host, final MaintenanceMode mode) {
+    checkNotNull(host);
+    checkNotNull(mode);
+
+    Optional<HostAttributes> saved = getHostAttributes(host);
+    if (saved.isPresent()) {
+      HostAttributes attributes = saved.get().setMode(mode);
+      log(Op.saveHostAttributes(new SaveHostAttributes(attributes)));
+      attributeStore.saveHostAttributes(attributes);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public SchedulerStore.Mutable getSchedulerStore() {
+    return this;
+  }
+
+  @Override
+  public JobStore.Mutable getJobStore() {
+    return this;
+  }
+
+  @Override
+  public TaskStore.Mutable getUnsafeTaskStore() {
+    return this;
+  }
+
+  @Override
+  public LockStore.Mutable getLockStore() {
+    return this;
+  }
+
+  @Override
+  public QuotaStore.Mutable getQuotaStore() {
+    return this;
+  }
+
+  @Override
+  public AttributeStore.Mutable getAttributeStore() {
+    return this;
+  }
+
+  @Override
+  public TaskStore getTaskStore() {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 8b26cc0..3437743 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -558,7 +558,6 @@ public class LogStorageTest extends EasyMockTest {
       @Override
       protected void setupExpectations() throws Exception {
         storageUtil.expectWriteOperation();
-        expect(storageUtil.taskStore.fetchTasks(Query.unscoped())).andReturn(ImmutableSet.of(task));
         storageUtil.taskStore.deleteTasks(taskIds);
         streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
             .andReturn(position);
@@ -566,7 +565,7 @@ public class LogStorageTest extends EasyMockTest {
 
       @Override
       protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().deleteAllTasks();
+        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
       }
     }.run();
   }
@@ -714,17 +713,6 @@ public class LogStorageTest extends EasyMockTest {
     }.run();
   }
 
-  @Test(expected = IllegalStateException.class)
-  public void testMutateRequiresWriteOperation() throws Exception {
-    new StorageTestFixture() {
-
-      @Override
-      protected void runTest() {
-        logStorage.deleteTasks(ImmutableSet.of("a"));
-      }
-    }.run();
-  }
-
   private LogEntry createTransaction(Op... ops) {
     return LogEntry.transaction(
         new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));