You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/22 00:44:05 UTC

[1/2] incubator-aurora git commit: Remove ReadWriteLock from MemStorage, remove Storage#weaklyConsistentRead.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master ecc3fbcac -> 5116c2209


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java
index 6d8d52a..dc27c3e 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java
@@ -97,7 +97,7 @@ public class DbQuotaStoreTest {
   }
 
   private Optional<IResourceAggregate> select(final String role) {
-    return storage.consistentRead(new Work.Quiet<Optional<IResourceAggregate>>() {
+    return storage.read(new Work.Quiet<Optional<IResourceAggregate>>() {
       @Override
       public Optional<IResourceAggregate> apply(StoreProvider storeProvider) {
         return storeProvider.getQuotaStore().fetchQuota(role);
@@ -108,7 +108,7 @@ public class DbQuotaStoreTest {
   private void assertQuotas(Map<String, IResourceAggregate> quotas) {
     assertEquals(
         quotas,
-        storage.consistentRead(new Work.Quiet<Map<String, IResourceAggregate>>() {
+        storage.read(new Work.Quiet<Map<String, IResourceAggregate>>() {
           @Override
           public Map<String, IResourceAggregate> apply(StoreProvider storeProvider) {
             return storeProvider.getQuotaStore().fetchQuotas();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java
index 9c00c8b..0bfb4d4 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java
@@ -55,7 +55,7 @@ public class DbSchedulerStoreTest {
   }
 
   private Optional<String> select() {
-    return storage.consistentRead(new Work.Quiet<Optional<String>>() {
+    return storage.read(new Work.Quiet<Optional<String>>() {
       @Override
       public Optional<String> apply(StoreProvider storeProvider) {
         return storeProvider.getSchedulerStore().fetchFrameworkId();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 259c6a9..0a5cc51 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage.log;
 import java.util.EnumSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
@@ -177,7 +178,8 @@ public class LogStorageTest extends EasyMockTest {
         storageUtil.quotaStore,
         storageUtil.attributeStore,
         storageUtil.jobUpdateStore,
-        eventSink);
+        eventSink,
+        new ReentrantLock());
 
     stream = createMock(Stream.class);
     streamMatcher = LogOpMatcher.matcherFor(stream);
@@ -481,7 +483,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws CodingException {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.schedulerStore.saveFrameworkId(frameworkId);
         streamMatcher.expectTransaction(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)))
             .andReturn(position);
@@ -502,7 +504,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.jobStore.saveAcceptedJob(managerId, jobConfig);
         streamMatcher.expectTransaction(
             Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())))
@@ -521,7 +523,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.jobStore.removeJob(JOB_KEY);
         streamMatcher.expectTransaction(
             Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder())))
@@ -541,7 +543,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.taskStore.saveTasks(tasks);
         streamMatcher.expectTransaction(
             Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))))
@@ -564,7 +566,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
         streamMatcher.expectTransaction(
             Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))))
@@ -587,7 +589,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         expect(storageUtil.taskStore.unsafeModifyInPlace(taskId2, updatedConfig)).andReturn(false);
         expect(storageUtil.taskStore.unsafeModifyInPlace(taskId, updatedConfig)).andReturn(true);
         streamMatcher.expectTransaction(
@@ -614,7 +616,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
 
         storageUtil.taskStore.deleteTasks(tasksToRemove);
@@ -649,7 +651,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.taskStore.saveTasks(saved);
 
         // Nested transaction with result.
@@ -679,7 +681,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.taskStore.saveTasks(saved);
 
         // Nested transaction with result.
@@ -710,7 +712,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.taskStore.deleteTasks(taskIds);
         streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
             .andReturn(position);
@@ -729,7 +731,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.taskStore.deleteTasks(taskIds);
         streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
             .andReturn(position);
@@ -751,7 +753,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.quotaStore.saveQuota(role, quota);
         streamMatcher.expectTransaction(Op.saveQuota(new SaveQuota(role, quota.newBuilder())))
             .andReturn(position);
@@ -770,7 +772,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.quotaStore.removeQuota(role);
         streamMatcher.expectTransaction(Op.removeQuota(new RemoveQuota(role))).andReturn(position);
       }
@@ -792,7 +794,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.lockStore.saveLock(lock);
         streamMatcher.expectTransaction(Op.saveLock(new SaveLock(lock.newBuilder())))
             .andReturn(position);
@@ -812,7 +814,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.lockStore.removeLock(lockKey);
         streamMatcher.expectTransaction(Op.removeLock(new RemoveLock(lockKey.newBuilder())))
             .andReturn(position);
@@ -838,7 +840,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         expect(storageUtil.attributeStore.getHostAttributes(host))
             .andReturn(Optional.<IHostAttributes>absent());
 
@@ -902,7 +904,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.jobUpdateStore.saveJobUpdate(update, lockToken);
         streamMatcher.expectTransaction(
             Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder(), lockToken.orNull())))
@@ -925,7 +927,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.jobUpdateStore.saveJobUpdateEvent(event, UPDATE_ID);
         streamMatcher.expectTransaction(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(
             event.newBuilder(),
@@ -949,7 +951,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(event, UPDATE_ID);
         streamMatcher.expectTransaction(Op.saveJobInstanceUpdateEvent(
             new SaveJobInstanceUpdateEvent(event.newBuilder(), UPDATE_ID))).andReturn(position);
@@ -971,7 +973,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectWriteOperation();
+        storageUtil.expectWrite();
         expect(storageUtil.jobUpdateStore.pruneHistory(
             pruneHistory.getPerJobRetainCount(),
             pruneHistory.getHistoryPruneThresholdMs())).andReturn(ImmutableSet.of("id1"));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
index 463b445..30e2328 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
@@ -45,7 +45,6 @@ import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -58,7 +57,6 @@ import static org.junit.Assert.fail;
 public class MemStorageTest extends TearDownTestCase {
 
   private ExecutorService executor;
-  private FakeStatsProvider statsProvider;
   private Storage storage;
 
   @Before
@@ -71,8 +69,7 @@ public class MemStorageTest extends TearDownTestCase {
         new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
       }
     });
-    statsProvider = new FakeStatsProvider();
-    storage = MemStorage.newEmptyStorage(statsProvider);
+    storage = MemStorage.newEmptyStorage();
   }
 
   @Test
@@ -85,7 +82,7 @@ public class MemStorageTest extends TearDownTestCase {
     Future<String> future = executor.submit(new Callable<String>() {
       @Override
       public String call() throws Exception {
-        return storage.consistentRead(new Work.Quiet<String>() {
+        return storage.read(new Work.Quiet<String>() {
           @Override
           public String apply(StoreProvider storeProvider) {
             slowReadStarted.countDown();
@@ -102,7 +99,7 @@ public class MemStorageTest extends TearDownTestCase {
 
     slowReadStarted.await();
 
-    String fastResult = storage.consistentRead(new Work.Quiet<String>() {
+    String fastResult = storage.read(new Work.Quiet<String>() {
       @Override
       public String apply(StoreProvider storeProvider) {
         return "fastResult";
@@ -111,13 +108,6 @@ public class MemStorageTest extends TearDownTestCase {
     assertEquals("fastResult", fastResult);
     slowReadFinished.countDown();
     assertEquals("slowResult", future.get());
-
-    // It would be nice to check the stat values (specifically with simulated lock contention, but
-    // this value is based off ReentrantReadWriteLock#getQueueLength(), which is documented as an
-    // approximation and not a guaranteed accurate value.
-    assertEquals(
-        ImmutableSet.of(MemStorage.THREADS_WAITING_GAUGE),
-        statsProvider.getAllValues().keySet());
   }
 
   private IScheduledTask makeTask(String taskId) {
@@ -143,7 +133,7 @@ public class MemStorageTest extends TearDownTestCase {
   }
 
   private void expectTasks(final String... taskIds) {
-    storage.consistentRead(new Work.Quiet<Void>() {
+    storage.read(new Work.Quiet<Void>() {
       @Override
       public Void apply(StoreProvider storeProvider) {
         Query.Builder query = Query.unscoped();
@@ -174,12 +164,12 @@ public class MemStorageTest extends TearDownTestCase {
       // Expected
     }
 
-    storage.consistentRead(new Work.Quiet<Void>() {
+    storage.read(new Work.Quiet<Void>() {
       @Override
       public Void apply(StoreProvider storeProvider) {
         // If the previous write was under a transaction then there would be no quota records.
         assertEquals(ImmutableMap.<String, IResourceAggregate>of(),
-                storeProvider.getQuotaStore().fetchQuotas());
+            storeProvider.getQuotaStore().fetchQuotas());
         return null;
       }
     });
@@ -221,7 +211,7 @@ public class MemStorageTest extends TearDownTestCase {
       }
     });
     expectTasks("a");
-    storage.consistentRead(new Work.Quiet<Void>() {
+    storage.read(new Work.Quiet<Void>() {
       @Override
       public Void apply(StoreProvider storeProvider) {
         assertEquals(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index 6918cba..d492e17 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -73,9 +73,9 @@ public class StorageTestUtil {
     this.storage = easyMock.createMock(NonVolatileStorage.class);
   }
 
-  public <T> IExpectationSetters<T> expectConsistentRead() {
+  public <T> IExpectationSetters<T> expectRead() {
     final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
-    return expect(storage.<T, RuntimeException>consistentRead(capture(work)))
+    return expect(storage.read(capture(work)))
         .andAnswer(new IAnswer<T>() {
           @Override
           public T answer() {
@@ -84,20 +84,9 @@ public class StorageTestUtil {
         });
   }
 
-  public <T> IExpectationSetters<T> expectWeaklyConsistentRead() {
-    final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
-    return expect(storage.<T, RuntimeException>weaklyConsistentRead(capture(work)))
-        .andAnswer(new IAnswer<T>() {
-          @Override
-          public T answer() {
-            return work.getValue().apply(storeProvider);
-          }
-        });
-  }
-
-  public <T> IExpectationSetters<T> expectWriteOperation() {
+  public <T> IExpectationSetters<T> expectWrite() {
     final Capture<MutateWork<T, RuntimeException>> work = EasyMockTest.createCapture();
-    return expect(storage.<T, RuntimeException>write(capture(work))).andAnswer(new IAnswer<T>() {
+    return expect(storage.write(capture(work))).andAnswer(new IAnswer<T>() {
       @Override
       public T answer() {
         return work.getValue().apply(mutableStoreProvider);
@@ -124,9 +113,8 @@ public class StorageTestUtil {
     expect(mutableStoreProvider.getLockStore()).andReturn(lockStore).anyTimes();
     expect(mutableStoreProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
     expect(mutableStoreProvider.getJobUpdateStore()).andReturn(jobUpdateStore).anyTimes();
-    expectConsistentRead().anyTimes();
-    expectWeaklyConsistentRead().anyTimes();
-    expectWriteOperation().anyTimes();
+    expectRead().anyTimes();
+    expectWrite().anyTimes();
   }
 
   public IExpectationSetters<?> expectTaskFetch(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 2533d82..4c827b1 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -217,7 +217,7 @@ public class JobUpdaterIT extends EasyMockTest {
 
   private String getTaskId(IJobKey job, int instanceId) {
     return Tasks.id(Iterables.getOnlyElement(
-        Storage.Util.consistentFetchTasks(
+        Storage.Util.fetchTasks(
             storage,
             Query.instanceScoped(job, instanceId).active())));
   }
@@ -271,7 +271,7 @@ public class JobUpdaterIT extends EasyMockTest {
       JobUpdateStatus expected,
       Multimap<Integer, JobUpdateAction> expectedActions) {
 
-    IJobUpdateDetails details = storage.consistentRead(new Work.Quiet<IJobUpdateDetails>() {
+    IJobUpdateDetails details = storage.read(new Work.Quiet<IJobUpdateDetails>() {
       @Override
       public IJobUpdateDetails apply(StoreProvider storeProvider) {
         return storeProvider.getJobUpdateStore().fetchJobUpdateDetails(UPDATE_ID).get();
@@ -314,7 +314,7 @@ public class JobUpdaterIT extends EasyMockTest {
 
   private void assertJobState(IJobKey job, Map<Integer, ITaskConfig> expected) {
     Iterable<IScheduledTask> tasks =
-        Storage.Util.consistentFetchTasks(storage, Query.jobScoped(job).active());
+        Storage.Util.fetchTasks(storage, Query.jobScoped(job).active());
 
     Map<Integer, IScheduledTask> tasksByInstance =
         Maps.uniqueIndex(tasks, Tasks.SCHEDULED_TO_INSTANCE_ID);


[2/2] incubator-aurora git commit: Remove ReadWriteLock from MemStorage, remove Storage#weaklyConsistentRead.

Posted by wf...@apache.org.
Remove ReadWriteLock from MemStorage, remove Storage#weaklyConsistentRead.

Bugs closed: AURORA-929

Reviewed at https://reviews.apache.org/r/28097/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/5116c220
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/5116c220
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/5116c220

Branch: refs/heads/master
Commit: 5116c22098ccef2dbc35ad1b71992f2868406e32
Parents: ecc3fbc
Author: Bill Farner <wf...@apache.org>
Authored: Wed Nov 19 15:35:04 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Nov 21 15:42:39 2014 -0800

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/TaskVars.java   |   2 +-
 .../scheduler/async/GcExecutorLauncher.java     |   4 +-
 .../aurora/scheduler/async/KillRetry.java       |   2 +-
 .../scheduler/async/RescheduleCalculator.java   |   5 +-
 .../scheduler/async/TaskHistoryPruner.java      |   5 +-
 .../async/preemptor/LiveClusterState.java       |   2 +-
 .../scheduler/async/preemptor/Preemptor.java    |   5 +-
 .../scheduler/cron/quartz/AuroraCronJob.java    |   2 +-
 .../cron/quartz/CronJobManagerImpl.java         |   6 +-
 .../aurora/scheduler/http/Maintenance.java      |   2 +-
 .../org/apache/aurora/scheduler/http/Mname.java |   2 +-
 .../apache/aurora/scheduler/http/Quotas.java    |   2 +-
 .../apache/aurora/scheduler/http/Slaves.java    |   2 +-
 .../aurora/scheduler/http/StructDump.java       |   2 +-
 .../scheduler/mesos/SchedulerDriverService.java |   2 +-
 .../aurora/scheduler/quota/QuotaManager.java    |   2 +-
 .../aurora/scheduler/sla/MetricCalculator.java  |   2 +-
 .../aurora/scheduler/state/LockManagerImpl.java |   4 +-
 .../scheduler/state/MaintenanceController.java  |   4 +-
 .../aurora/scheduler/stats/ResourceCounter.java |   4 +-
 .../storage/CallOrderEnforcingStorage.java      |  12 +-
 .../scheduler/storage/ReadWriteLockManager.java | 131 -------------------
 .../aurora/scheduler/storage/Storage.java       |  58 ++------
 .../storage/backup/TemporaryStorage.java        |   2 +-
 .../aurora/scheduler/storage/db/DbStorage.java  |  29 ++--
 .../scheduler/storage/log/LogStorage.java       | 109 ++++++++-------
 .../scheduler/storage/log/LogStorageModule.java |  23 ++--
 .../storage/log/SnapshotStoreImpl.java          |   2 +-
 .../scheduler/storage/mem/MemStorage.java       | 112 ++--------------
 .../thrift/SchedulerThriftInterface.java        |  14 +-
 .../app/local/FakeNonVolatileStorage.java       |  11 +-
 .../cron/quartz/CronJobManagerImplTest.java     |   2 +-
 .../filter/SchedulingFilterImplTest.java        |  25 ----
 .../scheduler/state/LockManagerImplTest.java    |   2 +-
 .../scheduler/state/StateManagerImplTest.java   |  10 +-
 .../storage/ReadWriteLockManagerTest.java       | 109 ---------------
 .../scheduler/storage/StorageBackfillTest.java  |   8 +-
 .../storage/db/DBJobUpdateStoreTest.java        |  12 +-
 .../storage/db/DbAttributeStoreTest.java        |   4 +-
 .../scheduler/storage/db/DbLockStoreTest.java   |   4 +-
 .../scheduler/storage/db/DbQuotaStoreTest.java  |   4 +-
 .../storage/db/DbSchedulerStoreTest.java        |   2 +-
 .../scheduler/storage/log/LogStorageTest.java   |  44 ++++---
 .../scheduler/storage/mem/MemStorageTest.java   |  24 +---
 .../storage/testing/StorageTestUtil.java        |  24 +---
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   6 +-
 46 files changed, 205 insertions(+), 639 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index 2f93054..f017cdd 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -167,7 +167,7 @@ class TaskVars extends AbstractIdleService implements EventSubscriber {
     if (Strings.isNullOrEmpty(task.getAssignedTask().getSlaveHost())) {
       rack = Optional.absent();
     } else {
-      rack = storage.consistentRead(new Work.Quiet<Optional<String>>() {
+      rack = storage.read(new Work.Quiet<Optional<String>>() {
         @Override
         public Optional<String> apply(StoreProvider storeProvider) {
           Optional<IAttribute> rack = FluentIterable

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index e02921d..5226e3d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -179,9 +179,7 @@ public class GcExecutorLauncher implements TaskLauncher {
   }
 
   private TaskInfo makeGcTask(String hostName, SlaveID slaveId) {
-    Set<IScheduledTask> tasksOnHost =
-        Storage.Util.weaklyConsistentFetchTasks(storage, Query.slaveScoped(hostName));
-
+    Set<IScheduledTask> tasksOnHost = Storage.Util.fetchTasks(storage, Query.slaveScoped(hostName));
     tasksCreated.incrementAndGet();
     return makeGcTask(
         hostName,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
index d6b7fab..3bb80ec 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
@@ -88,7 +88,7 @@ public class KillRetry implements EventSubscriber {
     @Override
     public void run() {
       Query.Builder query = Query.taskScoped(taskId).byStatus(ScheduleStatus.KILLING);
-      if (!Storage.Util.weaklyConsistentFetchTasks(storage, query).isEmpty()) {
+      if (!Storage.Util.fetchTasks(storage, query).isEmpty()) {
         LOG.info("Task " + taskId + " not yet killed, retrying.");
 
         // Kill did not yet take effect, try again.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index ca54c9a..0cf7fb4 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -25,7 +25,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.twitter.common.quantity.Amount;
@@ -147,8 +146,8 @@ public interface RescheduleCalculator {
         return Optional.absent();
       }
 
-      ImmutableSet<IScheduledTask> res =
-          Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
+      Set<IScheduledTask> res =
+          Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId()));
 
       return Optional.fromNullable(Iterables.getOnlyElement(res, null));
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
index 58d074b..985a319 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.async;
 
-import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -156,8 +155,8 @@ public class TaskHistoryPruner implements EventSubscriber {
     executor.submit(new Runnable() {
       @Override
       public void run() {
-        Collection<IScheduledTask> inactiveTasks =
-            Storage.Util.weaklyConsistentFetchTasks(storage, jobHistoryQuery(jobKey));
+        Set<IScheduledTask> inactiveTasks =
+            Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
         int tasksToPrune = inactiveTasks.size() - settings.perJobHistoryGoal;
         if (tasksToPrune > 0 && inactiveTasks.size() > settings.perJobHistoryGoal) {
           Set<String> toPrune = FluentIterable

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
index 0da4d2a..9d83acc 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
@@ -59,7 +59,7 @@ class LiveClusterState implements ClusterState {
   public Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
     // Only non-pending active tasks may be preempted.
     Iterable<IAssignedTask> activeTasks = Iterables.transform(
-        Storage.Util.consistentFetchTasks(storage, CANDIDATE_QUERY),
+        Storage.Util.fetchTasks(storage, CANDIDATE_QUERY),
         SCHEDULED_TO_ASSIGNED);
 
     // Group the tasks by slave id so they can be paired with offers from the same slave.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
index afbd645..767e601 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
@@ -53,6 +53,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.Util;
+import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -275,7 +276,7 @@ public interface Preemptor {
     }
 
     private Optional<IHostAttributes> getHostAttributes(final String host) {
-      return storage.weaklyConsistentRead(new Storage.Work.Quiet<Optional<IHostAttributes>>() {
+      return storage.read(new Work.Quiet<Optional<IHostAttributes>>() {
         @Override
         public Optional<IHostAttributes> apply(Storage.StoreProvider storeProvider) {
           return storeProvider.getAttributeStore().getHostAttributes(host);
@@ -294,7 +295,7 @@ public interface Preemptor {
     private Optional<IAssignedTask> fetchIdlePendingTask(String taskId) {
       Query.Builder query = Query.taskScoped(taskId).byStatus(PENDING);
       Iterable<IAssignedTask> result = FluentIterable
-          .from(Util.consistentFetchTasks(storage, query))
+          .from(Util.fetchTasks(storage, query))
           .filter(isIdleTask)
           .transform(SCHEDULED_TO_ASSIGNED);
       return Optional.fromNullable(Iterables.getOnlyElement(result, null));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index 84e37e4..efea7ad 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -205,7 +205,7 @@ class AuroraCronJob implements Job {
       delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
         @Override
         public Boolean get() {
-          if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
+          if (Storage.Util.fetchTasks(storage, query).isEmpty()) {
             LOG.info("Initiating delayed launch of cron " + path);
             storage.write(new Storage.MutateWork.NoResult.Quiet() {
               @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
index 8e2d3d9..28f1ae7 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
@@ -74,7 +74,7 @@ class CronJobManagerImpl implements CronJobManager {
   public void startJobNow(final IJobKey jobKey) throws CronException {
     requireNonNull(jobKey);
 
-    storage.weaklyConsistentRead(new Work<Void, CronException>() {
+    storage.read(new Work<Void, CronException>() {
       @Override
       public Void apply(Storage.StoreProvider storeProvider) throws CronException {
         checkCronExists(jobKey, storeProvider.getJobStore());
@@ -182,7 +182,7 @@ class CronJobManagerImpl implements CronJobManager {
   @Override
   public Iterable<IJobConfiguration> getJobs() {
     // NOTE: no synchronization is needed here since we don't touch internal quartz state.
-    return storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
+    return storage.read(new Work.Quiet<Iterable<IJobConfiguration>>() {
       @Override
       public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobStore().fetchJobs(getManagerKey());
@@ -194,7 +194,7 @@ class CronJobManagerImpl implements CronJobManager {
   public boolean hasJob(final IJobKey jobKey) {
     requireNonNull(jobKey);
 
-    return storage.consistentRead(new Work.Quiet<Boolean>() {
+    return storage.read(new Work.Quiet<Boolean>() {
       @Override
       public Boolean apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobStore().fetchJob(getManagerKey(), jobKey).isPresent();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
index be8a1ff..303f05c 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
@@ -58,7 +58,7 @@ public class Maintenance {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   public Response getHosts() {
-    return storage.weaklyConsistentRead(new Work.Quiet<Response>() {
+    return storage.read(new Work.Quiet<Response>() {
       @Override
       public Response apply(StoreProvider storeProvider) {
         Multimap<MaintenanceMode, String> hostsByMode =

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/Mname.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Mname.java b/src/main/java/org/apache/aurora/scheduler/http/Mname.java
index 883f954..69ce2aa 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Mname.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Mname.java
@@ -185,7 +185,7 @@ public class Mname {
       Optional<String> forwardRequest) {
 
     IScheduledTask task = Iterables.getOnlyElement(
-        Storage.Util.consistentFetchTasks(storage,
+        Storage.Util.fetchTasks(storage,
             Query.instanceScoped(JobKeys.from(role, env, job), instanceId).active()),
         null);
     if (task == null) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Quotas.java b/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
index 5f3cce3..e1bf0cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
@@ -56,7 +56,7 @@ public class Quotas {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   public Response getQuotas(@QueryParam("role") final String role) {
-    return storage.weaklyConsistentRead(new Work.Quiet<Response>() {
+    return storage.read(new Work.Quiet<Response>() {
       @Override
       public Response apply(StoreProvider storeProvider) {
         Map<String, IResourceAggregate> quotas;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Slaves.java b/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
index 0ea462f..b64e18c 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
@@ -63,7 +63,7 @@ public class Slaves extends JerseyTemplateServlet {
   }
 
   private Iterable<IHostAttributes> getHostAttributes() {
-    return storage.weaklyConsistentRead(new Work.Quiet<Iterable<IHostAttributes>>() {
+    return storage.read(new Work.Quiet<Iterable<IHostAttributes>>() {
       @Override
       public Iterable<IHostAttributes> apply(StoreProvider storeProvider) {
         return storeProvider.getAttributeStore().getHostAttributes();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
index 8147d54..12b28be 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
@@ -122,7 +122,7 @@ public class StructDump extends JerseyTemplateServlet {
       @Override
       public void execute(StringTemplate template) {
         template.setAttribute("id", id);
-        Optional<? extends TBase<?, ?>> struct = storage.weaklyConsistentRead(work);
+        Optional<? extends TBase<?, ?>> struct = storage.read(work);
         if (struct.isPresent()) {
           template.setAttribute("structPretty", Util.prettyPrint(struct.get()));
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
index 88150e5..da2d5df 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
@@ -68,7 +68,7 @@ class SchedulerDriverService extends AbstractIdleService implements Driver {
 
   @Override
   protected void startUp() {
-    Optional<String> frameworkId = storage.consistentRead(
+    Optional<String> frameworkId = storage.read(
         new Storage.Work.Quiet<Optional<String>>() {
           @Override
           public Optional<String> apply(Storage.StoreProvider storeProvider) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index e38407e..934b920 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -193,7 +193,7 @@ public interface QuotaManager {
      * @return {@code QuotaInfo} with quota and consumption details.
      */
     private QuotaInfo getQuotaInfo(final String role, final Optional<IJobUpdate> requestedUpdate) {
-      return storage.consistentRead(new Work.Quiet<QuotaInfo>() {
+      return storage.read(new Work.Quiet<QuotaInfo>() {
         @Override
         public QuotaInfo apply(StoreProvider storeProvider) {
           FluentIterable<IScheduledTask> tasks = FluentIterable.from(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
index 149bb33..82f36d5 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
@@ -168,7 +168,7 @@ class MetricCalculator implements Runnable {
   @Override
   public void run() {
     FluentIterable<IScheduledTask> tasks =
-        FluentIterable.from(Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped()));
+        FluentIterable.from(Storage.Util.fetchTasks(storage, Query.unscoped()));
 
     List<IScheduledTask> prodTasks = tasks.filter(Predicates.compose(
         Predicates.and(Tasks.IS_PRODUCTION, IS_SERVICE),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
index f167290..6aa281c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -96,7 +96,7 @@ public class LockManagerImpl implements LockManager {
   public void validateIfLocked(final ILockKey context, Optional<ILock> heldLock)
       throws LockException {
 
-    Optional<ILock> stored = storage.consistentRead(new Work.Quiet<Optional<ILock>>() {
+    Optional<ILock> stored = storage.read(new Work.Quiet<Optional<ILock>>() {
       @Override
       public Optional<ILock> apply(StoreProvider storeProvider) {
         return storeProvider.getLockStore().fetchLock(context);
@@ -125,7 +125,7 @@ public class LockManagerImpl implements LockManager {
 
   @Override
   public Iterable<ILock> getLocks() {
-    return storage.weaklyConsistentRead(new Work.Quiet<Iterable<ILock>>() {
+    return storage.read(new Work.Quiet<Iterable<ILock>>() {
       @Override
       public Iterable<ILock> apply(StoreProvider storeProvider) {
         return storeProvider.getLockStore().fetchLocks();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index a835eaa..8e6f4e7 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -219,7 +219,7 @@ public interface MaintenanceController {
 
     @Override
     public MaintenanceMode getMode(final String host) {
-      return storage.weaklyConsistentRead(new Work.Quiet<MaintenanceMode>() {
+      return storage.read(new Work.Quiet<MaintenanceMode>() {
         @Override
         public MaintenanceMode apply(StoreProvider storeProvider) {
           return storeProvider.getAttributeStore().getHostAttributes(host)
@@ -232,7 +232,7 @@ public interface MaintenanceController {
 
     @Override
     public Set<HostStatus> getStatus(final Set<String> hosts) {
-      return storage.weaklyConsistentRead(new Work.Quiet<Set<HostStatus>>() {
+      return storage.read(new Work.Quiet<Set<HostStatus>>() {
         @Override
         public Set<HostStatus> apply(StoreProvider storeProvider) {
           // Warning - this is filtering _all_ host attributes.  If using this to frequently query

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
index c30a8c9..e5c0322 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
@@ -53,7 +53,7 @@ public class ResourceCounter {
 
   private Iterable<ITaskConfig> getTasks(Query.Builder query) throws StorageException {
     return Iterables.transform(
-        Storage.Util.consistentFetchTasks(storage, query),
+        Storage.Util.fetchTasks(storage, query),
         Tasks.SCHEDULED_TO_INFO);
   }
 
@@ -91,7 +91,7 @@ public class ResourceCounter {
    * @throws StorageException if there was a problem fetching quotas from storage.
    */
   public Metric computeQuotaAllocationTotals() throws StorageException {
-    return storage.weaklyConsistentRead(new Work.Quiet<Metric>() {
+    return storage.read(new Work.Quiet<Metric>() {
       @Override
       public Metric apply(StoreProvider storeProvider) {
         Metric allocation = new Metric();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index 0d02207..07d81e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -110,17 +110,9 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
   }
 
   @Override
-  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
+  public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
     checkInState(State.READY);
-    return wrapped.consistentRead(work);
-  }
-
-  @Override
-  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
-      throws StorageException, E {
-
-    checkInState(State.READY);
-    return wrapped.weaklyConsistentRead(work);
+    return wrapped.read(work);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java b/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java
deleted file mode 100644
index 4e6d68b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage;
-
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.base.Preconditions;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A lock manager that wraps a ReadWriteLock and detects ill-fated attempts to upgrade
- * a read-locked thread to a write-locked thread, which would otherwise deadlock.
- */
-public class ReadWriteLockManager {
-  private final ReentrantReadWriteLock managedLock = new ReentrantReadWriteLock();
-
-  private enum LockMode {
-    NONE,
-    READ,
-    WRITE
-  }
-
-  public enum LockType {
-    READ(LockMode.READ),
-    WRITE(LockMode.WRITE);
-
-    private LockMode mode;
-
-    private LockType(LockMode mode) {
-      this.mode = mode;
-    }
-
-    LockMode getMode() {
-      return mode;
-    }
-  }
-
-  private static class LockState {
-    private LockMode initialLockMode = LockMode.NONE;
-    private int lockCount = 0;
-
-    private boolean lockAcquired(LockMode mode) {
-      boolean stateChanged = false;
-      if (initialLockMode == LockMode.NONE) {
-        initialLockMode = mode;
-        stateChanged = true;
-      }
-      if (initialLockMode.equals(mode)) {
-        lockCount++;
-      }
-      return stateChanged;
-    }
-
-    private void lockReleased(LockMode mode) {
-      if (initialLockMode.equals(mode)) {
-        lockCount--;
-        if (lockCount == 0) {
-          initialLockMode = LockMode.NONE;
-        }
-      }
-    }
-  }
-
-  private final ThreadLocal<LockState> lockState = new ThreadLocal<LockState>() {
-    @Override
-    protected LockState initialValue() {
-      return new LockState();
-    }
-  };
-
-  /**
-   * Blocks until this thread has acquired the requested lock.
-   *
-   * @param type Type of lock to acquire.
-   * @return {@code true} if the lock was newly-acquired, or {@code false} if this thread previously
-   *         secured the lock and has yet to release it.
-   */
-  public boolean lock(LockType type) {
-    requireNonNull(type);
-
-    if (LockType.READ == type) {
-      managedLock.readLock().lock();
-    } else {
-      Preconditions.checkState(lockState.get().initialLockMode != LockMode.READ,
-          "A read operation may not be upgraded to a write operation.");
-
-      managedLock.writeLock().lock();
-    }
-
-    return lockState.get().lockAcquired(type.getMode());
-  }
-
-  /**
-   * Releases this thread's lock of the given type.
-   *
-   * @param type Type of lock to release.
-   */
-  public void unlock(LockType type) {
-    requireNonNull(type);
-
-    if (LockType.READ == type) {
-      managedLock.readLock().unlock();
-    } else {
-      managedLock.writeLock().unlock();
-    }
-
-    lockState.get().lockReleased(type.getMode());
-  }
-
-  /**
-   * Gets an approximation for the number of threads waiting to acquire the read or write lock.
-   *
-   * @see ReentrantReadWriteLock#getQueueLength()
-   * @return The estimated number of threads waiting for this lock.
-   */
-  public int getQueueLength() {
-    return managedLock.getQueueLength();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 682bca8..42233e5 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -22,7 +22,7 @@ import javax.inject.Qualifier;
 
 import com.google.common.collect.ImmutableSet;
 
-import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Query.Builder;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
@@ -175,8 +175,12 @@ public interface Storage {
   }
 
   /**
-   * Executes the unit of read-only {@code work}.  All data in the stores may be expected to be
-   * consistent, as the invocation is mutually exclusive of any writes.
+   * Executes the unit of read-only {@code work}.  The consistency model creates the possibility
+   * for a reader to read uncommitted state from a concurrent writer.
+   * <p>
+   * TODO(wfarner): Update this documentation once all stores are backed by
+   * {@link org.apache.aurora.scheduler.storage.db.DbStorage}, as the concurrency behavior will then
+   * be dictated by the {@link org.mybatis.guice.transactional.Transactional#isolation()} used.
    *
    * @param work The unit of work to execute.
    * @param <T> The type of result this unit of work produces.
@@ -185,21 +189,7 @@ public interface Storage {
    * @throws StorageException if there was a problem reading from stable storage.
    * @throws E bubbled transparently when the unit of work throws
    */
-  <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E;
-
-  /**
-   * Executes a unit of read-only {@code work}.  This is functionally identical to
-   * {@link #consistentRead(Work)} with the exception that data in the stores may not be fully
-   * consistent.
-   *
-   * @param work The unit of work to execute.
-   * @param <T> The type of result this unit of work produces.
-   * @param <E> The type of exception this unit of work can throw.
-   * @return the result when the unit of work completes successfully
-   * @throws StorageException if there was a problem reading from stable storage.
-   * @throws E bubbled transparently when the unit of work throws
-   */
-  <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work) throws StorageException, E;
+  <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E;
 
   /**
    * Executes the unit of mutating {@code work}.
@@ -234,7 +224,7 @@ public interface Storage {
      *
      * @param initializationLogic work to perform after this storage system is ready but before
      *     allowing general use of
-     *     {@link #consistentRead}.
+     *     {@link #read}.
      * @throws StorageException if there was a starting storage.
      */
     void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
@@ -274,37 +264,13 @@ public interface Storage {
      * Fetch tasks matching the query returned by {@code query} from {@code storage} in a
      * read operation.
      *
-     * @see #consistentFetchTasks
+     * @see #fetchTasks
      * @param storage Storage instance to query from.
      * @param query Builder of the query to perform.
      * @return Tasks returned from the query.
      */
-    public static ImmutableSet<IScheduledTask> consistentFetchTasks(
-        Storage storage,
-        final Query.Builder query) {
-
-      return storage.consistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
-        @Override
-        public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
-          return storeProvider.getTaskStore().fetchTasks(query);
-        }
-      });
-    }
-
-    /**
-     * Identical to {@link #consistentFetchTasks(Storage, Query.Builder)}, but fetches tasks using a
-     * weakly-consistent read operation.
-     *
-     * @see #consistentFetchTasks
-     * @param storage Storage instance to query from.
-     * @param query Builder of the query to perform.
-     * @return Tasks returned from the query.
-     */
-    public static ImmutableSet<IScheduledTask> weaklyConsistentFetchTasks(
-        Storage storage,
-        final Query.Builder query) {
-
-      return storage.weaklyConsistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
+    public static ImmutableSet<IScheduledTask> fetchTasks(Storage storage, final Builder query) {
+      return storage.read(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
         @Override
         public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
           return storeProvider.getTaskStore().fetchTasks(query);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 503bdfe..2102adb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -88,7 +88,7 @@ interface TemporaryStorage {
 
         @Override
         public Set<IScheduledTask> fetchTasks(final Query.Builder query) {
-          return storage.consistentRead(new Work.Quiet<Set<IScheduledTask>>() {
+          return storage.read(new Work.Quiet<Set<IScheduledTask>>() {
             @Override
             public Set<IScheduledTask> apply(StoreProvider storeProvider) {
               return storeProvider.getTaskStore().fetchTasks(query);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index 40487e5..ea4afb4 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -46,14 +46,13 @@ import static java.util.Objects.requireNonNull;
 
 /**
  * A storage implementation backed by a relational database.
- *
- * <p>Delegates read and write concurrency semantics to the underlying database.
- * In this implementation, {@link #weaklyConsistentRead(Work)} and {@link #consistentRead(Work)}
- * have identical behaviour as they are both annotated by
- * {@link org.mybatis.guice.transactional.Transactional}. This class is currently only
- * partially implemented, with the underlying {@link MutableStoreProvider} only providing
- * a {@link LockStore.Mutable} implementation. It is designed to be a long term replacement
- * for {@link org.apache.aurora.scheduler.storage.mem.MemStorage}.</p>
+ * <p>
+ * Delegates read and write concurrency semantics to the underlying database.
+ * This class is currently only partially implemented, with the underlying
+ * {@link MutableStoreProvider} only providing some, but not all, store implementations. It is
+ * designed to be a long term replacement for
+ * {@link org.apache.aurora.scheduler.storage.mem.MemStorage}.
+ * </p>
  */
 class DbStorage extends AbstractIdleService implements Storage {
 
@@ -123,19 +122,7 @@ class DbStorage extends AbstractIdleService implements Storage {
 
   @Override
   @Transactional
-  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    try {
-      return work.apply(storeProvider);
-    } catch (PersistenceException e) {
-      throw new StorageException(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  @Transactional
-  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
-      throws StorageException, E {
-
+  public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
     try {
       return work.apply(storeProvider);
     } catch (PersistenceException e) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 45ea50f..ba1672f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -36,6 +37,7 @@ import com.twitter.common.base.Closure;
 import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.SlidingStats;
 import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
@@ -185,6 +187,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   private final QuotaStore.Mutable writeBehindQuotaStore;
   private final AttributeStore.Mutable writeBehindAttributeStore;
   private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
+  private final ReentrantLock writeLock;
 
   private StreamManager streamManager;
   private final WriteAheadStorage writeAheadStorage;
@@ -196,22 +199,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   private boolean recovered = false;
   private StreamTransaction transaction = null;
 
-  /**
-   * Identifies the grace period to give in-process snapshots and checkpoints to complete during
-   * shutdown.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
-  @Qualifier
-  public @interface ShutdownGracePeriod { }
-
-  /**
-   * Identifies the interval between snapshots of local storage truncating the log.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
-  @Qualifier
-  public @interface SnapshotInterval { }
+  private final SlidingStats writerWaitStats =
+      new SlidingStats("log_storage_write_lock_wait", "ns");
 
   /**
    * Identifies a local storage layer that is written to only after first ensuring the write
@@ -229,9 +218,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   LogStorage(
       LogManager logManager,
       ShutdownRegistry shutdownRegistry,
-      @ShutdownGracePeriod Amount<Long, Time> shutdownGracePeriod,
+      Settings settings,
       SnapshotStore<Snapshot> snapshotStore,
-      @SnapshotInterval Amount<Long, Time> snapshotInterval,
       @WriteBehind Storage storage,
       @WriteBehind SchedulerStore.Mutable schedulerStore,
       @WriteBehind JobStore.Mutable jobStore,
@@ -240,12 +228,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
       @WriteBehind QuotaStore.Mutable quotaStore,
       @WriteBehind AttributeStore.Mutable attributeStore,
       @WriteBehind JobUpdateStore.Mutable jobUpdateStore,
-      EventSink eventSink) {
+      EventSink eventSink,
+      ReentrantLock writeLock) {
 
     this(logManager,
-        new ScheduledExecutorSchedulingService(shutdownRegistry, shutdownGracePeriod),
+        new ScheduledExecutorSchedulingService(shutdownRegistry, settings.getShutdownGracePeriod()),
         snapshotStore,
-        snapshotInterval,
+        settings.getSnapshotInterval(),
         storage,
         schedulerStore,
         jobStore,
@@ -254,7 +243,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         quotaStore,
         attributeStore,
         jobUpdateStore,
-        eventSink);
+        eventSink,
+        writeLock);
   }
 
   @VisibleForTesting
@@ -271,7 +261,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
       QuotaStore.Mutable quotaStore,
       AttributeStore.Mutable attributeStore,
       JobUpdateStore.Mutable jobUpdateStore,
-      EventSink eventSink) {
+      EventSink eventSink,
+      ReentrantLock writeLock) {
 
     this.logManager = requireNonNull(logManager);
     this.schedulingService = requireNonNull(schedulingService);
@@ -290,6 +281,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
     this.writeBehindQuotaStore = requireNonNull(quotaStore);
     this.writeBehindAttributeStore = requireNonNull(attributeStore);
     this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
+    this.writeLock = requireNonNull(writeLock);
     TransactionManager transactionManager = new TransactionManager() {
       @Override
       public boolean hasActiveTransaction() {
@@ -319,7 +311,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
 
   @VisibleForTesting
   final Map<LogEntry._Fields, Closure<LogEntry>> buildLogEntryReplayActions() {
-
     return ImmutableMap.<LogEntry._Fields, Closure<LogEntry>>builder()
         .put(LogEntry._Fields.SNAPSHOT, new Closure<LogEntry>() {
           @Override
@@ -347,12 +338,12 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
           public void execute(LogEntry item) throws RuntimeException {
             // Nothing to do here
           }
-        }).build();
+        })
+        .build();
   }
 
   @VisibleForTesting
   final Map<Op._Fields, Closure<Op>> buildTransactionReplayActions() {
-
     return ImmutableMap.<Op._Fields, Closure<Op>>builder()
         .put(Op._Fields.SAVE_FRAMEWORK_ID, new Closure<Op>() {
           @Override
@@ -481,9 +472,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
     } catch (IOException e) {
       throw new IllegalStateException("Failed to open the log, cannot continue", e);
     }
-
-    // TODO(John Sirois): start incremental recovery here from the log and do a final recovery
-    // catchup in start after shutting down the incremental syncer.
   }
 
   @Override
@@ -586,11 +574,11 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         Snapshot snapshot = snapshotStore.createSnapshot();
         persist(snapshot);
         LOG.info("Snapshot complete."
-                 + " host attrs: " + snapshot.getHostAttributesSize()
-                 + ", jobs: " + snapshot.getJobsSize()
-                 + ", locks: " + snapshot.getLocksSize()
-                 + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
-                 + ", tasks: " + snapshot.getTasksSize());
+            + " host attrs: " + snapshot.getHostAttributesSize()
+            + ", jobs: " + snapshot.getJobsSize()
+            + ", locks: " + snapshot.getLocksSize()
+            + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+            + ", tasks: " + snapshot.getTasksSize());
       }
     });
   }
@@ -603,16 +591,9 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
     streamManager.snapshot(snapshot);
   }
 
-  @Override
-  public synchronized <T, E extends Exception> T write(final MutateWork<T, E> work)
+  private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
       throws StorageException, E {
 
-    // We don't want to use the log when recovering from it, we just want to update the underlying
-    // store - so pass mutations straight through to the underlying storage.
-    if (!recovered) {
-      return writeBehindStorage.write(work);
-    }
-
     // The log stream transaction has already been set up so we just need to delegate with our
     // store provider so any mutations performed by work get logged.
     if (transaction != null) {
@@ -643,15 +624,26 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   }
 
   @Override
-  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    return writeBehindStorage.consistentRead(work);
+  public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
+    long waitStart = System.nanoTime();
+    writeLock.lock();
+    try {
+      writerWaitStats.accumulate(System.nanoTime() - waitStart);
+      // We don't want to use the log when recovering from it, we just want to update the underlying
+      // store - so pass mutations straight through to the underlying storage.
+      if (!recovered) {
+        return writeBehindStorage.write(work);
+      }
+
+      return doInTransaction(work);
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override
-  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
-      throws StorageException, E {
-
-    return writeBehindStorage.weaklyConsistentRead(work);
+  public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
+    return writeBehindStorage.read(work);
   }
 
   @Override
@@ -666,4 +658,25 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
       throw new StorageException("Failed to create a snapshot", e);
     }
   }
+
+  /**
+   * Configuration settings for log storage.
+   */
+  public static class Settings {
+    private final Amount<Long, Time> shutdownGracePeriod;
+    private final Amount<Long, Time> snapshotInterval;
+
+    public Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
+      this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
+      this.snapshotInterval = requireNonNull(snapshotInterval);
+    }
+
+    public Amount<Long, Time> getShutdownGracePeriod() {
+      return shutdownGracePeriod;
+    }
+
+    public Amount<Long, Time> getSnapshotInterval() {
+      return snapshotInterval;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index 73348f3..e515770 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -13,15 +13,12 @@
  */
 package org.apache.aurora.scheduler.storage.log;
 
-import java.lang.annotation.Annotation;
-
 import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
-import com.google.inject.AbstractModule;
-import com.google.inject.Key;
+import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.twitter.common.args.Arg;
@@ -32,10 +29,11 @@ import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.log.LogManager.DeduplicateSnapshots;
 import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.LogStorage.ShutdownGracePeriod;
-import org.apache.aurora.scheduler.storage.log.LogStorage.SnapshotInterval;
+import org.apache.aurora.scheduler.storage.log.LogStorage.Settings;
 
 import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
 import static org.apache.aurora.scheduler.storage.log.LogManager.DeflateSnapshots;
@@ -45,7 +43,7 @@ import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.Snaps
 /**
  * Bindings for scheduler distributed log based storage.
  */
-public class LogStorageModule extends AbstractModule {
+public class LogStorageModule extends PrivateModule {
 
   @CmdLine(name = "dlog_shutdown_grace_period",
            help = "Specifies the maximum time to wait for scheduled checkpoint and snapshot "
@@ -76,8 +74,8 @@ public class LogStorageModule extends AbstractModule {
 
   @Override
   protected void configure() {
-    bindInterval(ShutdownGracePeriod.class, SHUTDOWN_GRACE_PERIOD);
-    bindInterval(SnapshotInterval.class, SNAPSHOT_INTERVAL);
+    bind(Settings.class)
+        .toInstance(new Settings(SHUTDOWN_GRACE_PERIOD.get(), SNAPSHOT_INTERVAL.get()));
 
     bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
         .toInstance(MAX_LOG_ENTRY_SIZE.get());
@@ -88,6 +86,9 @@ public class LogStorageModule extends AbstractModule {
 
     install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
     bind(DistributedSnapshotStore.class).to(LogStorage.class);
+    expose(Storage.class);
+    expose(NonVolatileStorage.class);
+    expose(DistributedSnapshotStore.class);
 
     bind(EntrySerializer.class).to(EntrySerializerImpl.class);
     // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
@@ -100,8 +101,4 @@ public class LogStorageModule extends AbstractModule {
         .implement(StreamManager.class, StreamManagerImpl.class)
         .build(StreamManagerFactory.class));
   }
-
-  private void bindInterval(Class<? extends Annotation> key, Arg<Amount<Long, Time>> value) {
-    bind(Key.get(new TypeLiteral<Amount<Long, Time>>() { }, key)).toInstance(value.get());
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 66ff567..ea33037 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -256,7 +256,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
   @Timed("snapshot_create")
   @Override
   public Snapshot createSnapshot() {
-    return storage.consistentRead(new Work.Quiet<Snapshot>() {
+    return storage.read(new Work.Quiet<Snapshot>() {
       @Override
       public Snapshot apply(StoreProvider storeProvider) {
         Snapshot snapshot = new Snapshot();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
index 2cc76dc..bc3a7c5 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
@@ -17,30 +17,22 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Supplier;
-import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.twitter.common.inject.Bindings;
 import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.stats.SlidingStats;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.JobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.LockStore;
 import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.ReadWriteLockManager;
-import org.apache.aurora.scheduler.storage.ReadWriteLockManager.LockType;
 import org.apache.aurora.scheduler.storage.SchedulerStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.TaskStore;
@@ -50,27 +42,9 @@ import static java.util.Objects.requireNonNull;
 
 /**
  * A storage implementation comprised of individual in-memory store implementations.
- * <p>
- * This storage has a global read-write lock, which is used when invoking
- * {@link #consistentRead(Work)} and {@link #write(MutateWork)}.  However, no locks are used at this
- * level for {@link #weaklyConsistentRead(Work)}. It is the responsibility of the
- * individual stores to ensure that read operations are thread-safe (optimally supporting
- * concurrency).  Store implementations may assume that all methods invoked on {@code Mutable}
- * store interfaces are protected by the global write lock, and thus invoked serially.
  */
 public class MemStorage implements Storage {
-  private final AtomicLong readLockWaitNanos = Stats.exportLong("read_lock_wait_nanos");
-  private final AtomicLong writeLockWaitNanos = Stats.exportLong("write_lock_wait_nanos");
-
-  // We choose to not use the @Timed decorator for these stats since nested transactions are normal
-  // and pollute the stats.
-  private final SlidingStats readStats =
-      new SlidingStats("mem_storage_consistent_read_operation", "nanos");
-  private final SlidingStats writeStats =
-      new SlidingStats("mem_storage_write_operation", "nanos");
-
   private final MutableStoreProvider storeProvider;
-  private final ReadWriteLockManager lockManager = new ReadWriteLockManager();
   private final Storage delegatedStore;
 
   /**
@@ -93,8 +67,7 @@ public class MemStorage implements Storage {
       @Delegated final Storage delegated,
       @Delegated final QuotaStore.Mutable quotaStore,
       @Delegated final AttributeStore.Mutable attributeStore,
-      @Delegated final JobUpdateStore.Mutable updateStore,
-      StatsProvider statsProvider) {
+      @Delegated final JobUpdateStore.Mutable updateStore) {
 
     this.delegatedStore = requireNonNull(delegated);
     storeProvider = new MutableStoreProvider() {
@@ -138,91 +111,40 @@ public class MemStorage implements Storage {
         return updateStore;
       }
     };
-
-    statsProvider.makeGauge(
-        THREADS_WAITING_GAUGE,
-        new Supplier<Integer>() {
-          @Override
-          public Integer get() {
-            return lockManager.getQueueLength();
-          }
-        }
-    );
   }
 
   /**
-   * Creates a new empty in-memory storage for use in testing, exporting gauges to the provided
-   * stats provider.  NOTE: Due to the fact that some libraries statically access {@link Stats},
-   * not all guaranteed to use the stats provider.
+   * Creates a new empty in-memory storage for use in testing.
    */
   @VisibleForTesting
-  public static Storage newEmptyStorage(final StatsProvider statsProvider) {
+  public static Storage newEmptyStorage() {
     Injector injector = Guice.createInjector(
         DbModule.testModule(Bindings.annotatedKeyFactory(Delegated.class)),
-        new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
-        new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(StatsProvider.class).toInstance(statsProvider);
-          }
-        });
-
+        new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)));
     Storage storage = injector.getInstance(Key.get(Storage.class, Volatile.class));
     storage.prepare();
     return storage;
   }
 
-  /**
-   * Creates a new empty in-memory storage for use in testing.
-   */
-  @VisibleForTesting
-  public static Storage newEmptyStorage() {
-    return newEmptyStorage(Stats.STATS_PROVIDER);
-  }
-
-  private <S extends StoreProvider, T, E extends Exception> T doWork(
-      LockType lockType,
-      S stores,
-      StorageOperation<S, T, E> work,
-      SlidingStats stats,
-      AtomicLong lockWaitStat) throws StorageException, E {
-
-    requireNonNull(work);
-
-    // Perform the work, and only record stats for top-level transactions.  This prevents
-    // over-counting when nested transactions are performed.
-    long lockStartNanos = System.nanoTime();
-    boolean topLevelOperation = lockManager.lock(lockType);
-    if (topLevelOperation) {
-      lockWaitStat.addAndGet(System.nanoTime() - lockStartNanos);
-    }
-    try {
-      return work.apply(stores);
-    } finally {
-      lockManager.unlock(lockType);
-      if (topLevelOperation) {
-        stats.accumulate(System.nanoTime() - lockStartNanos);
-      }
-    }
-  }
-
+  @Timed("mem_storage_read_operation")
   @Override
-  public <T, E extends Exception> T consistentRead(final Work<T, E> work)
+  public <T, E extends Exception> T read(final Work<T, E> work)
           throws StorageException, E {
-    return delegatedStore.consistentRead(new Work<T, E>() {
+    return delegatedStore.read(new Work<T, E>() {
       @Override
       public T apply(StoreProvider provider) throws E {
-        return doWork(LockType.READ, storeProvider, work, readStats, readLockWaitNanos);
+        return work.apply(storeProvider);
       }
     });
   }
 
+  @Timed("mem_storage_write_operation")
   @Override
   public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
     return delegatedStore.write(new MutateWork<T, E>() {
       @Override
       public T apply(MutableStoreProvider provider) throws E {
-       return doWork(LockType.WRITE, storeProvider, work, writeStats, writeLockWaitNanos);
+        return work.apply(storeProvider);
       }
     });
   }
@@ -231,18 +153,4 @@ public class MemStorage implements Storage {
   public void prepare() throws StorageException {
     delegatedStore.prepare();
   }
-
-  @Timed("mem_storage_weakly_consistent_read_operation")
-  @Override
-  public <T, E extends Exception> T weaklyConsistentRead(final Work<T, E> work)
-      throws StorageException, E {
-
-    return delegatedStore.weaklyConsistentRead(new Work<T, E>() {
-      @Override
-      public T apply(StoreProvider provider) throws E {
-        return work.apply(storeProvider);
-      }
-    });
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index b66b916..a5e869f 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -497,9 +497,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   private List<ScheduledTask> getTasks(TaskQuery query) {
     requireNonNull(query);
 
-    Iterable<IScheduledTask> tasks =
-        Storage.Util.weaklyConsistentFetchTasks(storage, Query.arbitrary(query));
-
+    Iterable<IScheduledTask> tasks = Storage.Util.fetchTasks(storage, Query.arbitrary(query));
     if (query.isSetOffset()) {
       tasks = Iterables.skip(tasks, query.getOffset());
     }
@@ -558,7 +556,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     IJobKey jobKey = JobKeys.assertValid(IJobKey.build(job));
 
     Set<IScheduledTask> activeTasks =
-        Storage.Util.weaklyConsistentFetchTasks(storage, Query.jobScoped(jobKey).active());
+        Storage.Util.fetchTasks(storage, Query.jobScoped(jobKey).active());
 
     Iterable<IAssignedTask> assignedTasks =
         Iterables.transform(activeTasks, Tasks.SCHEDULED_TO_ASSIGNED);
@@ -578,7 +576,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   @Override
   public Response getRoleSummary() {
     Multimap<String, IJobKey> jobsByRole = mapByRole(
-        Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped()),
+        Storage.Util.fetchTasks(storage, Query.unscoped()),
         Tasks.SCHEDULED_TO_JOB_KEY);
 
     Multimap<String, IJobKey> cronJobsByRole = mapByRole(
@@ -674,7 +672,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   }
 
   private Multimap<IJobKey, IScheduledTask> getTasks(Query.Builder query) {
-    return Tasks.byJobKey(Storage.Util.weaklyConsistentFetchTasks(storage, query));
+    return Tasks.byJobKey(Storage.Util.fetchTasks(storage, query));
   }
 
   private static <T> Multimap<String, IJobKey> mapByRole(
@@ -1538,7 +1536,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     final IJobUpdateQuery query = IJobUpdateQuery.build(requireNonNull(mutableQuery));
     return okResponse(Result.getJobUpdateSummariesResult(
         new GetJobUpdateSummariesResult().setUpdateSummaries(IJobUpdateSummary.toBuildersList(
-            storage.weaklyConsistentRead(new Work.Quiet<List<IJobUpdateSummary>>() {
+            storage.read(new Work.Quiet<List<IJobUpdateSummary>>() {
               @Override
               public List<IJobUpdateSummary> apply(StoreProvider storeProvider) {
                 return storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(query);
@@ -1550,7 +1548,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   public Response getJobUpdateDetails(final String updateId) {
     requireNonNull(updateId);
     Optional<IJobUpdateDetails> details =
-        storage.weaklyConsistentRead(new Work.Quiet<Optional<IJobUpdateDetails>>() {
+        storage.read(new Work.Quiet<Optional<IJobUpdateDetails>>() {
           @Override
           public Optional<IJobUpdateDetails> apply(StoreProvider storeProvider) {
             return storeProvider.getJobUpdateStore().fetchJobUpdateDetails(updateId);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
index 5cc3b41..3336f8c 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
@@ -56,14 +56,7 @@ class FakeNonVolatileStorage implements NonVolatileStorage {
   }
 
   @Override
-  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
-      throws StorageException, E {
-
-    return delegate.weaklyConsistentRead(work);
-  }
-
-  @Override
-  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    return delegate.consistentRead(work);
+  public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
+    return delegate.read(work);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
index d56c36c..934e9bb 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
@@ -211,7 +211,7 @@ public class CronJobManagerImplTest extends EasyMockTest {
   }
 
   private Optional<IJobConfiguration> fetchFromStorage() {
-    return storage.consistentRead(new Storage.Work.Quiet<Optional<IJobConfiguration>>() {
+    return storage.read(new Storage.Work.Quiet<Optional<IJobConfiguration>>() {
       @Override
       public Optional<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index b60dd9c..265c38d 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -43,14 +43,9 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
 import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
@@ -103,35 +98,15 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private final AtomicLong taskIdCounter = new AtomicLong();
 
   private SchedulingFilter defaultFilter;
-  private Storage storage;
-  private StoreProvider storeProvider;
   private AttributeStore.Mutable attributeStore;
 
   @Before
   public void setUp() {
-    storage = createMock(Storage.class);
     defaultFilter = new SchedulingFilterImpl();
-    storeProvider = createMock(StoreProvider.class);
     attributeStore = createMock(AttributeStore.Mutable.class);
     emptyJob = new AttributeAggregate(
         Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
         attributeStore);
-
-    // Link the store provider to the store mocks.
-    expectReads();
-
-    expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
-  }
-
-  private void expectReads() {
-    expect(storage.weaklyConsistentRead(EasyMock.<Quiet<Object>>anyObject()))
-        .andAnswer(new IAnswer<Object>() {
-          @Override
-          public Object answer() {
-            Quiet<?> arg = (Quiet<?>) EasyMock.getCurrentArguments()[0];
-            return arg.apply(storeProvider);
-          }
-        }).anyTimes();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index 953fb9e..2f14205 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -173,7 +173,7 @@ public class LockManagerImplTest extends EasyMockTest {
 
     final CountDownLatch reads = new CountDownLatch(2);
     EasyMock.makeThreadSafe(storageUtil.storage, false);
-    expect(storageUtil.storage.consistentRead(EasyMock.<Work<Object, ?>>anyObject()))
+    expect(storageUtil.storage.read(EasyMock.<Work<Object, ?>>anyObject()))
         .andAnswer(new IAnswer<Object>() {
           @Override
           public Object answer() throws Throwable {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 157921c..03b1c9b 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -183,7 +183,7 @@ public class StateManagerImplTest extends EasyMockTest {
             .setTaskId(taskId)
             .setTask(task.newBuilder()));
     assertEquals(ImmutableSet.of(IScheduledTask.build(expected)),
-        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId)));
+        Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
   }
 
   @Test
@@ -332,7 +332,7 @@ public class StateManagerImplTest extends EasyMockTest {
     changeState(taskId, RUNNING);
     changeState(taskId, FAILED);
     IScheduledTask rescheduledTask = Iterables.getOnlyElement(
-        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId2)));
+        Storage.Util.fetchTasks(storage, Query.taskScoped(taskId2)));
     assertEquals(taskId, rescheduledTask.getAncestorId());
     assertEquals(1, rescheduledTask.getFailureCount());
   }
@@ -410,7 +410,7 @@ public class StateManagerImplTest extends EasyMockTest {
     assignTask(taskId, HOST_A, ImmutableSet.of(80, 81, 82));
 
     IScheduledTask actual = Iterables.getOnlyElement(
-        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId)));
+        Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
 
     assertEquals(
         requestedPorts,
@@ -442,7 +442,7 @@ public class StateManagerImplTest extends EasyMockTest {
     assignTask(newTaskId, HOST_A, ImmutableSet.of(86));
 
     IScheduledTask actual = Iterables.getOnlyElement(
-        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(newTaskId)));
+        Storage.Util.fetchTasks(storage, Query.taskScoped(newTaskId)));
 
     assertEquals(ImmutableMap.of("one", 86), actual.getAssignedTask().getAssignedPorts());
   }
@@ -471,7 +471,7 @@ public class StateManagerImplTest extends EasyMockTest {
     control.replay();
 
     insertTask(task, 0);
-    Iterables.getOnlyElement(Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId)));
+    Iterables.getOnlyElement(Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
 
     insertTask(task, 0);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/ReadWriteLockManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/ReadWriteLockManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/ReadWriteLockManagerTest.java
deleted file mode 100644
index b63ff3e..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/ReadWriteLockManagerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.testing.TearDown;
-import com.google.common.testing.junit4.TearDownTestCase;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.scheduler.storage.ReadWriteLockManager.LockType.READ;
-import static org.apache.aurora.scheduler.storage.ReadWriteLockManager.LockType.WRITE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class ReadWriteLockManagerTest extends TearDownTestCase {
-
-  private ReadWriteLockManager lockManager;
-  private ExecutorService executor;
-
-  @Before
-  public void setUp() {
-    lockManager = new ReadWriteLockManager();
-    executor = Executors.newCachedThreadPool(
-        new ThreadFactoryBuilder().setNameFormat("LockManagerTest-%d").setDaemon(true).build());
-    addTearDown(new TearDown() {
-      @Override
-      public void tearDown() {
-        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
-      }
-    });
-  }
-
-  @Test
-  public void testModeDowngrade() {
-    lockManager.lock(WRITE);
-    lockManager.lock(READ);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testModeUpgrade() {
-    lockManager.lock(READ);
-    lockManager.lock(WRITE);
-  }
-
-  @Test
-  public void testSimultaneousReads() throws Exception {
-    final CountDownLatch slowReadStarted = new CountDownLatch(1);
-    final CountDownLatch fastReadFinished = new CountDownLatch(1);
-
-    Future<String> slowReadResult = executor.submit(new Callable<String>() {
-      @Override
-      public String call() throws Exception {
-        lockManager.lock(READ);
-        slowReadStarted.countDown();
-        fastReadFinished.await();
-        lockManager.unlock(READ);
-        return "slow";
-      }
-    });
-
-    slowReadStarted.await();
-    lockManager.lock(READ);
-    lockManager.unlock(READ);
-    fastReadFinished.countDown();
-    assertEquals("slow", slowReadResult.get());
-  }
-
-  @Test
-  public void testReentrantReadLock() {
-    assertTrue(lockManager.lock(READ));
-    assertFalse(lockManager.lock(READ));
-    lockManager.unlock(READ);
-    lockManager.unlock(READ);
-    assertTrue(lockManager.lock(READ));
-  }
-
-  @Test
-  public void testReentrantWriteLock() {
-    assertTrue(lockManager.lock(WRITE));
-    assertFalse(lockManager.lock(WRITE));
-    lockManager.unlock(WRITE);
-    lockManager.unlock(WRITE);
-    assertTrue(lockManager.lock(WRITE));
-    assertFalse(lockManager.lock(READ));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
index 2514d84..64c1b7f 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
@@ -185,7 +185,7 @@ public class StorageBackfillTest {
     backfill();
 
     IJobConfiguration actual = Iterables.getOnlyElement(
-        storage.consistentRead(new Storage.Work.Quiet<Iterable<IJobConfiguration>>() {
+        storage.read(new Storage.Work.Quiet<Iterable<IJobConfiguration>>() {
           @Override
           public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
             return storeProvider.getJobStore().fetchJobs("CRON");
@@ -235,7 +235,7 @@ public class StorageBackfillTest {
         ImmutableSet.of(
             IScheduledTask.build(noJobKeyBackfilled),
             IScheduledTask.build(nullJobKeyFieldsBackfilled)),
-        Storage.Util.consistentFetchTasks(storage, Query.unscoped()));
+        Storage.Util.fetchTasks(storage, Query.unscoped()));
   }
 
   private void backfill() {
@@ -280,12 +280,12 @@ public class StorageBackfillTest {
   }
 
   private IScheduledTask getTask(String taskId) {
-    return Iterables.getOnlyElement(Storage.Util.consistentFetchTasks(
+    return Iterables.getOnlyElement(Storage.Util.fetchTasks(
         storage,
         Query.taskScoped(taskId)));
   }
 
   private Set<IScheduledTask> getTasksByStatus(ScheduleStatus status) {
-    return Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(status));
+    return Storage.Util.fetchTasks(storage, Query.unscoped().byStatus(status));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
index c0ccfaa..ca7c0c2 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
@@ -658,7 +658,7 @@ public class DBJobUpdateStoreTest {
   }
 
   private Optional<IJobUpdate> getUpdate(final String updateId) {
-    return storage.consistentRead(new Quiet<Optional<IJobUpdate>>() {
+    return storage.read(new Quiet<Optional<IJobUpdate>>() {
       @Override
       public Optional<IJobUpdate> apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobUpdateStore().fetchJobUpdate(updateId);
@@ -667,7 +667,7 @@ public class DBJobUpdateStoreTest {
   }
 
   private List<IJobInstanceUpdateEvent> getInstanceEvents(final String updateId, final int id) {
-    return storage.consistentRead(new Quiet<List<IJobInstanceUpdateEvent>>() {
+    return storage.read(new Quiet<List<IJobInstanceUpdateEvent>>() {
       @Override
       public List<IJobInstanceUpdateEvent> apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobUpdateStore().fetchInstanceEvents(updateId, id);
@@ -676,7 +676,7 @@ public class DBJobUpdateStoreTest {
   }
 
   private Optional<IJobUpdateInstructions> getUpdateInstructions(final String updateId) {
-    return storage.consistentRead(new Quiet<Optional<IJobUpdateInstructions>>() {
+    return storage.read(new Quiet<Optional<IJobUpdateInstructions>>() {
       @Override
       public Optional<IJobUpdateInstructions> apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobUpdateStore().fetchJobUpdateInstructions(updateId);
@@ -685,7 +685,7 @@ public class DBJobUpdateStoreTest {
   }
 
   private Optional<IJobUpdateDetails> getUpdateDetails(final String updateId) {
-    return storage.consistentRead(new Quiet<Optional<IJobUpdateDetails>>() {
+    return storage.read(new Quiet<Optional<IJobUpdateDetails>>() {
       @Override
       public Optional<IJobUpdateDetails> apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobUpdateStore().fetchJobUpdateDetails(updateId);
@@ -694,7 +694,7 @@ public class DBJobUpdateStoreTest {
   }
 
   private Set<StoredJobUpdateDetails> getAllUpdateDetails() {
-    return storage.consistentRead(new Quiet<Set<StoredJobUpdateDetails>>() {
+    return storage.read(new Quiet<Set<StoredJobUpdateDetails>>() {
       @Override
       public Set<StoredJobUpdateDetails> apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobUpdateStore().fetchAllJobUpdateDetails();
@@ -703,7 +703,7 @@ public class DBJobUpdateStoreTest {
   }
 
   private List<IJobUpdateSummary> getSummaries(final JobUpdateQuery query) {
-    return storage.consistentRead(new Quiet<List<IJobUpdateSummary>>() {
+    return storage.read(new Quiet<List<IJobUpdateSummary>>() {
       @Override
       public List<IJobUpdateSummary> apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java
index d659658..db508ca 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java
@@ -140,7 +140,7 @@ public class DbAttributeStoreTest {
   }
 
   private Optional<IHostAttributes> read(final String host) {
-    return storage.consistentRead(new Work.Quiet<Optional<IHostAttributes>>() {
+    return storage.read(new Work.Quiet<Optional<IHostAttributes>>() {
       @Override
       public Optional<IHostAttributes> apply(StoreProvider storeProvider) {
         return storeProvider.getAttributeStore().getHostAttributes(host);
@@ -149,7 +149,7 @@ public class DbAttributeStoreTest {
   }
 
   private Set<IHostAttributes> readAll() {
-    return storage.consistentRead(new Work.Quiet<Set<IHostAttributes>>() {
+    return storage.read(new Work.Quiet<Set<IHostAttributes>>() {
       @Override
       public Set<IHostAttributes> apply(StoreProvider storeProvider) {
         return storeProvider.getAttributeStore().getHostAttributes();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
index e9b210f..d6140f5 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
@@ -44,7 +44,7 @@ public class DbLockStoreTest {
   private void assertLocks(final ILock... expected) {
     assertEquals(
         ImmutableSet.<ILock>builder().add(expected).build(),
-        storage.consistentRead(new Quiet<Set<ILock>>() {
+        storage.read(new Quiet<Set<ILock>>() {
           @Override
           public Set<ILock> apply(Storage.StoreProvider storeProvider) {
             return storeProvider.getLockStore().fetchLocks();
@@ -53,7 +53,7 @@ public class DbLockStoreTest {
   }
 
   private Optional<ILock> getLock(final ILockKey key) {
-    return storage.consistentRead(new Quiet<Optional<ILock>>() {
+    return storage.read(new Quiet<Optional<ILock>>() {
       @Override
       public Optional<ILock> apply(StoreProvider storeProvider) {
         return storeProvider.getLockStore().fetchLock(key);